Updated Branches: refs/heads/trunk bb7fb11b6 -> 0f4a66fb0
FLUME-2190. Add a source capable of feeding off of the Twitter Streaming API (Roman Shaposhnik via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0f4a66fb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0f4a66fb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0f4a66fb Branch: refs/heads/trunk Commit: 0f4a66fb0f2946cd61dd8df31bd255fef7581cbc Parents: bb7fb11 Author: Hari Shreedharan <[email protected]> Authored: Fri Sep 27 13:21:48 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 27 13:21:48 2013 -0700 ---------------------------------------------------------------------- flume-ng-dist/pom.xml | 4 + flume-ng-doc/sphinx/FlumeUserGuide.rst | 41 +++ flume-ng-sources/flume-twitter-source/pom.xml | 61 ++++ .../flume/source/twitter/TwitterSource.java | 333 +++++++++++++++++++ .../flume/source/twitter/TestTwitterSource.java | 112 +++++++ .../src/test/resources/log4j.properties | 33 ++ .../src/test/resources/twitter-flume.conf | 92 +++++ flume-ng-sources/pom.xml | 1 + pom.xml | 23 ++ 9 files changed, 700 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/0f4a66fb/flume-ng-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml index 83332a9..2d0ee47 100644 --- a/flume-ng-dist/pom.xml +++ b/flume-ng-dist/pom.xml @@ -121,6 +121,10 @@ <artifactId>flume-jms-source</artifactId> </dependency> <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-twitter-source</artifactId> + </dependency> + <dependency> <groupId>org.apache.flume.flume-ng-legacy-sources</groupId> <artifactId>flume-avro-source</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/0f4a66fb/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index dac3ce7..007436b 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -977,6 +977,47 @@ Example for an agent named agent-1: agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool agent-1.sources.src-1.fileHeader = true +Twitter 1% firehose Source (experimental) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. warning:: + This source is hightly experimental and may change between minor versions of Flume. + Use at your own risk. + +Experimental source that connects via Streaming API to the 1% sample twitter +firehose, continously downloads tweets, converts them to Avro format and +sends Avro events to a downstream Flume sink. Requires the consumer and +access tokens and secrets of a Twitter developer account. +Required properties are in **bold**. + +================== =========== =================================================== +Property Name Default Description +================== =========== =================================================== +**channels** -- +**type** -- The component type name, needs to be ``org.apache.flume.source.twitter.TwitterSource`` +**consumerKey** -- OAuth consumer key +**consumerSecret** -- OAuth consumer secret +**accessToken** -- OAuth access token +**accessTokenSecret** -- OAuth toekn secret +maxBatchSize 1000 Maximum number of twitter messages to put in a single batch +maxBatchDurationMillis 1000 Maximum number of milliseconds to wait before closing a batch +================== =========== =================================================== + +Example for agent named a1: + +.. code-block:: properties + + a1.sources = r1 + a1.channels = c1 + a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource + a1.sources.r1.channels = c1 + a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY + a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET + a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN + a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET + a1.sources.r1.maxBatchSize = 10 + a1.sources.r1.maxBatchDurationMillis = 200 + Event Deserializers ''''''''''''''''''' http://git-wip-us.apache.org/repos/asf/flume/blob/0f4a66fb/flume-ng-sources/flume-twitter-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-twitter-source/pom.xml b/flume-ng-sources/flume-twitter-source/pom.xml new file mode 100644 index 0000000..a5a27cf --- /dev/null +++ b/flume-ng-sources/flume-twitter-source/pom.xml @@ -0,0 +1,61 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flume-ng-sources</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.5.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-twitter-source</artifactId> + <name>Flume Twitter Source</name> + + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-core</artifactId> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-media-support</artifactId> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + </dependency> + </dependencies> + + <build> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flume/blob/0f4a66fb/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java new file mode 100644 index 0000000..27b2c3f --- /dev/null +++ b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source.twitter; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.text.DecimalFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.conf.Configurable; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.source.AbstractSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import twitter4j.MediaEntity; +import twitter4j.StallWarning; +import twitter4j.Status; +import twitter4j.StatusDeletionNotice; +import twitter4j.StatusListener; +import twitter4j.TwitterStream; +import twitter4j.TwitterStreamFactory; +import twitter4j.User; +import twitter4j.auth.AccessToken; + +/** + * Demo Flume source that connects via Streaming API to the 1% sample twitter + * firehose, continously downloads tweets, converts them to Avro format and + * sends Avro events to a downstream Flume sink. + * + * Requires the consumer and access tokens and secrets of a Twitter developer + * account + */ + [email protected] [email protected] +public class TwitterSource + extends AbstractSource + implements EventDrivenSource, Configurable, StatusListener { + + private TwitterStream twitterStream; + private Schema avroSchema; + + private long docCount = 0; + private long startTime = 0; + private long exceptionCount = 0; + private long totalTextIndexed = 0; + private long skippedDocs = 0; + private long batchEndTime = 0; + private final List<Record> docs = new ArrayList<Record>(); + private final ByteArrayOutputStream serializationBuffer = + new ByteArrayOutputStream(); + private DataFileWriter<GenericRecord> dataFileWriter; + + private int maxBatchSize = 1000; + private int maxBatchDurationMillis = 1000; + + // Fri May 14 02:52:55 +0000 2010 + private SimpleDateFormat formatterTo = + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + private DecimalFormat numFormatter = new DecimalFormat("###,###.###"); + + private static int REPORT_INTERVAL = 100; + private static int STATS_INTERVAL = REPORT_INTERVAL * 10; + private static final Logger LOGGER = + LoggerFactory.getLogger(TwitterSource.class); + + public TwitterSource() { + } + + @Override + public void configure(Context context) { + String consumerKey = context.getString("consumerKey"); + String consumerSecret = context.getString("consumerSecret"); + String accessToken = context.getString("accessToken"); + String accessTokenSecret = context.getString("accessTokenSecret"); + + LOGGER.info("Consumer Key: '" + consumerKey + "'"); + LOGGER.info("Consumer Secret: '" + consumerSecret + "'"); + LOGGER.info("Access Token: '" + accessToken + "'"); + LOGGER.info("Access Token Secret: '" + accessTokenSecret + "'"); + + twitterStream = new TwitterStreamFactory().getInstance(); + twitterStream.setOAuthConsumer(consumerKey, consumerSecret); + twitterStream.setOAuthAccessToken(new AccessToken(accessToken, + accessTokenSecret)); + twitterStream.addListener(this); + avroSchema = createAvroSchema(); + dataFileWriter = new DataFileWriter<GenericRecord>( + new GenericDatumWriter<GenericRecord>(avroSchema)); + + maxBatchSize = context.getInteger("maxBatchSize", maxBatchSize); + maxBatchDurationMillis = context.getInteger("maxBatchDurationMillis", + maxBatchDurationMillis); + } + + @Override + public synchronized void start() { + LOGGER.info("Starting twitter source {} ...", this); + docCount = 0; + startTime = System.currentTimeMillis(); + exceptionCount = 0; + totalTextIndexed = 0; + skippedDocs = 0; + batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis; + twitterStream.sample(); + LOGGER.info("Twitter source {} started.", getName()); + // This should happen at the end of the start method, since this will + // change the lifecycle status of the component to tell the Flume + // framework that this component has started. Doing this any earlier + // tells the framework that the component started successfully, even + // if the method actually fails later. + super.start(); + } + + @Override + public synchronized void stop() { + LOGGER.info("Twitter source {} stopping...", getName()); + twitterStream.shutdown(); + super.stop(); + LOGGER.info("Twitter source {} stopped.", getName()); + } + + public void onStatus(Status status) { + Record doc = extractRecord("", avroSchema, status); + if (doc == null) { + return; // skip + } + docs.add(doc); + if (docs.size() >= maxBatchSize || + System.currentTimeMillis() >= batchEndTime) { + batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis; + byte[] bytes; + try { + bytes = serializeToAvro(avroSchema, docs); + } catch (IOException e) { + LOGGER.error("Exception while serializing tweet", e); + return; //skip + } + Event event = EventBuilder.withBody(bytes); + getChannelProcessor().processEvent(event); // send event to the flume sink + docs.clear(); + } + docCount++; + if ((docCount % REPORT_INTERVAL) == 0) { + LOGGER.info(String.format("Processed %s docs", + numFormatter.format(docCount))); + } + if ((docCount % STATS_INTERVAL) == 0) { + logStats(); + } + } + + private Schema createAvroSchema() { + Schema avroSchema = Schema.createRecord("Doc", "adoc", null, false); + List<Field> fields = new ArrayList<Field>(); + fields.add(new Field("id", Schema.create(Type.STRING), null, null)); + fields.add(new Field("user_friends_count", + createOptional(Schema.create(Type.INT)), + null, null)); + fields.add(new Field("user_location", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("user_description", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("user_statuses_count", + createOptional(Schema.create(Type.INT)), + null, null)); + fields.add(new Field("user_followers_count", + createOptional(Schema.create(Type.INT)), + null, null)); + fields.add(new Field("user_name", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("user_screen_name", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("created_at", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("text", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("retweet_count", + createOptional(Schema.create(Type.LONG)), + null, null)); + fields.add(new Field("retweeted", + createOptional(Schema.create(Type.BOOLEAN)), + null, null)); + fields.add(new Field("in_reply_to_user_id", + createOptional(Schema.create(Type.LONG)), + null, null)); + fields.add(new Field("source", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("in_reply_to_status_id", + createOptional(Schema.create(Type.LONG)), + null, null)); + fields.add(new Field("media_url_https", + createOptional(Schema.create(Type.STRING)), + null, null)); + fields.add(new Field("expanded_url", + createOptional(Schema.create(Type.STRING)), + null, null)); + avroSchema.setFields(fields); + return avroSchema; + } + + private Record extractRecord(String idPrefix, Schema avroSchema, Status status) { + User user = status.getUser(); + Record doc = new Record(avroSchema); + + doc.put("id", idPrefix + status.getId()); + doc.put("created_at", formatterTo.format(status.getCreatedAt())); + doc.put("retweet_count", status.getRetweetCount()); + doc.put("retweeted", status.isRetweet()); + doc.put("in_reply_to_user_id", status.getInReplyToUserId()); + doc.put("in_reply_to_status_id", status.getInReplyToStatusId()); + + addString(doc, "source", status.getSource()); + addString(doc, "text", status.getText()); + + MediaEntity[] mediaEntities = status.getMediaEntities(); + if (mediaEntities.length > 0) { + addString(doc, "media_url_https", mediaEntities[0].getMediaURLHttps()); + addString(doc, "expanded_url", mediaEntities[0].getExpandedURL()); + } + + doc.put("user_friends_count", user.getFriendsCount()); + doc.put("user_statuses_count", user.getStatusesCount()); + doc.put("user_followers_count", user.getFollowersCount()); + addString(doc, "user_location", user.getLocation()); + addString(doc, "user_description", user.getDescription()); + addString(doc, "user_screen_name", user.getScreenName()); + addString(doc, "user_name", user.getName()); + return doc; + } + + private byte[] serializeToAvro(Schema avroSchema, List<Record> docList) + throws IOException { + serializationBuffer.reset(); + dataFileWriter.create(avroSchema, serializationBuffer); + for (Record doc2 : docList) { + dataFileWriter.append(doc2); + } + dataFileWriter.close(); + return serializationBuffer.toByteArray(); + } + + private Schema createOptional(Schema schema) { + return Schema.createUnion(Arrays.asList( + new Schema[] { schema, Schema.create(Type.NULL) })); + } + + private void addString(Record doc, String avroField, String val) { + if (val == null) { + return; + } + doc.put(avroField, val); + totalTextIndexed += val.length(); + } + + private void logStats() { + double mbIndexed = totalTextIndexed / (1024 * 1024.0); + long seconds = (System.currentTimeMillis() - startTime) / 1000; + seconds = Math.max(seconds, 1); + LOGGER.info(String.format("Total docs indexed: %s, total skipped docs: %s", + numFormatter.format(docCount), numFormatter.format(skippedDocs))); + LOGGER.info(String.format(" %s docs/second", + numFormatter.format(docCount / seconds))); + LOGGER.info(String.format("Run took %s seconds and processed:", + numFormatter.format(seconds))); + LOGGER.info(String.format(" %s MB/sec sent to index", + numFormatter.format(((float) totalTextIndexed / (1024 * 1024)) / seconds))); + LOGGER.info(String.format(" %s MB text sent to index", + numFormatter.format(mbIndexed))); + LOGGER.info(String.format("There were %s exceptions ignored: ", + numFormatter.format(exceptionCount))); + } + + public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { + // Do nothing... + } + + public void onScrubGeo(long userId, long upToStatusId) { + // Do nothing... + } + + public void onStallWarning(StallWarning warning) { + // Do nothing... + } + + public void onTrackLimitationNotice(int numberOfLimitedStatuses) { + // Do nothing... + } + + public void onException(Exception e) { + LOGGER.error("Exception while streaming tweets", e); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/0f4a66fb/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java b/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java new file mode 100644 index 0000000..f6cc2c9 --- /dev/null +++ b/flume-ng-sources/flume-twitter-source/src/test/java/org/apache/flume/source/twitter/TestTwitterSource.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.source.twitter; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.text.SimpleDateFormat; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Sink; +import org.apache.flume.SinkRunner; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.ReplicatingChannelSelector; +import org.apache.flume.conf.Configurables; +import org.apache.flume.sink.DefaultSinkProcessor; +import org.apache.flume.sink.LoggerSink; +import org.apache.flume.source.twitter.TwitterSource; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestTwitterSource extends Assert { + + @BeforeClass + public static void setUp() { + try { + Assume.assumeNotNull(InetAddress.getByName("stream.twitter.com")); + } catch (UnknownHostException e) { + Assume.assumeTrue(false); // ignore Test if twitter is unreachable + } + } + + @Test + public void testBasic() throws Exception { + String consumerKey = System.getProperty("twitter.consumerKey"); + Assume.assumeNotNull(consumerKey); + + String consumerSecret = System.getProperty("twitter.consumerSecret"); + Assume.assumeNotNull(consumerSecret); + + String accessToken = System.getProperty("twitter.accessToken"); + Assume.assumeNotNull(accessToken); + + String accessTokenSecret = System.getProperty("twitter.accessTokenSecret"); + Assume.assumeNotNull(accessTokenSecret); + + Context context = new Context(); + context.put("consumerKey", consumerKey); + context.put("consumerSecret", consumerSecret); + context.put("accessToken", accessToken); + context.put("accessTokenSecret", accessTokenSecret); + context.put("maxBatchDurationMillis", "1000"); + + TwitterSource source = new TwitterSource(); + source.configure(context); + + Map<String, String> channelContext = new HashMap(); + channelContext.put("capacity", "1000000"); + channelContext.put("keep-alive", "0"); // for faster tests + Channel channel = new MemoryChannel(); + Configurables.configure(channel, new Context(channelContext)); + + Sink sink = new LoggerSink(); + sink.setChannel(channel); + sink.start(); + DefaultSinkProcessor proc = new DefaultSinkProcessor(); + proc.setSinks(Collections.singletonList(sink)); + SinkRunner sinkRunner = new SinkRunner(proc); + sinkRunner.start(); + + ChannelSelector rcs = new ReplicatingChannelSelector(); + rcs.setChannels(Collections.singletonList(channel)); + ChannelProcessor chp = new ChannelProcessor(rcs); + source.setChannelProcessor(chp); + source.start(); + + Thread.sleep(5000); + source.stop(); + sinkRunner.stop(); + sink.stop(); + } + + @Test + public void testCarrotDateFormatBug() throws Exception { + SimpleDateFormat formatterFrom = new SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy"); + formatterFrom.parse("Fri Oct 26 22:53:55 +0000 2012"); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/0f4a66fb/flume-ng-sources/flume-twitter-source/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-twitter-source/src/test/resources/log4j.properties b/flume-ng-sources/flume-twitter-source/src/test/resources/log4j.properties new file mode 100644 index 0000000..7755024 --- /dev/null +++ b/flume-ng-sources/flume-twitter-source/src/test/resources/log4j.properties @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +log4j.rootLogger=WARN, A1 + +log4j.logger.org.apache.flume.sink=INFO +#log4j.logger.org.apache.flume.sink.solr=DEBUG +log4j.logger.org.apache.solr=INFO +#log4j.logger.org.apache.solr.hadoop=DEBUG +log4j.logger.org.apache.solr.morphline=DEBUG +log4j.logger.org.apache.solr.update.processor.LogUpdateProcessor=WARN +log4j.logger.org.apache.solr.core.SolrCore=WARN +log4j.logger.org.apache.solr.search.SolrIndexSearcher=ERROR + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flume/blob/0f4a66fb/flume-ng-sources/flume-twitter-source/src/test/resources/twitter-flume.conf ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-twitter-source/src/test/resources/twitter-flume.conf b/flume-ng-sources/flume-twitter-source/src/test/resources/twitter-flume.conf new file mode 100644 index 0000000..72fe4ef --- /dev/null +++ b/flume-ng-sources/flume-twitter-source/src/test/resources/twitter-flume.conf @@ -0,0 +1,92 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# The configuration file needs to define the sources, +# the channels and the sinks. +# Sources, channels and sinks are defined per agent, +# in this case called 'agent' + +agent.sources = twitterSrc +#agent.sources = httpSrc +#agent.sources = spoolSrc +#agent.sources = avroSrc +agent.channels = memoryChannel +agent.sinks = solrSink +#agent.sinks = loggerSink + +agent.sources.twitterSrc.type = org.apache.flume.source.twitter.TwitterSource +agent.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY +agent.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET +agent.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN +agent.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET +agent.sources.twitterSrc.maxBatchDurationMillis = 200 +agent.sources.twitterSrc.channels = memoryChannel + +agent.sources.httpSrc.type = org.apache.flume.source.http.HTTPSource +agent.sources.httpSrc.port = 5140 +agent.sources.httpSrc.handler = org.apache.flume.sink.solr.morphline.BlobHandler +agent.sources.httpSrc.handler.maxBlobLength = 2000000000 +#agent.sources.httpSrc.interceptors = uuidinterceptor +#agent.sources.httpSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder +#agent.sources.httpSrc.interceptors.uuidinterceptor.headerName = id +##agent.sources.httpSrc.interceptors.uuidinterceptor.preserveExisting = false +##agent.sources.httpSrc.interceptors.uuidinterceptor.prefix = myhostname +agent.sources.httpSrc.channels = memoryChannel + +agent.sources.spoolSrc.type = spooldir +agent.sources.spoolSrc.spoolDir = /tmp/myspooldir +agent.sources.spoolSrc.ignorePattern = \. +agent.sources.spoolSrc.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder +agent.sources.spoolSrc.deserializer.maxBlobLength = 2000000000 +agent.sources.spoolSrc.batchSize = 1 +agent.sources.spoolSrc.fileHeader = true +agent.sources.spoolSrc.fileHeaderKey = resourceName +#agent.sources.spoolSrc.interceptors = uuidinterceptor +#agent.sources.spoolSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder +#agent.sources.spoolSrc.interceptors.uuidinterceptor.headerName = id +##agent.sources.spoolSrc.interceptors.uuidinterceptor.preserveExisting = false +##agent.sources.spoolSrc.interceptors.uuidinterceptor.prefix = myhostname +agent.sources.spoolSrc.channels = memoryChannel + +agent.sources.avroSrc.type = avro +agent.sources.avroSrc.bind = 127.0.0.1 +agent.sources.avroSrc.port = 10000 +agent.sources.avroSrc.channels = memoryChannel +agent.sources.avroSrc.interceptors = uuidinterceptor +agent.sources.avroSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder +agent.sources.avroSrc.interceptors.uuidinterceptor.headerName = id +#agent.sources.avroSrc.interceptors.uuidinterceptor.preserveExisting = false +#agent.sources.avroSrc.interceptors.uuidinterceptor.prefix = myhostname +#agent.sources.avroSrc.interceptors = morphlineinterceptor +#agent.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder +#agent.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf +#agent.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1 + +agent.channels.memoryChannel.type = memory +agent.channels.memoryChannel.capacity = 10000 +agent.channels.memoryChannel.transactionCapacity = 1000 + +#agent.channels.fileChannel.type = file +#agent.channels.fileChannel.capacity = 1000000 +#agent.channels.fileChannel.transactionCapacity = 1000 +#agent.channels.fileChannel.write-timeout = 1 + +agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink +agent.sinks.solrSink.channel = memoryChannel +#agent.sinks.solrSink.batchSize = 1000 +#agent.sinks.solrSink.batchDurationMillis = 1000 +agent.sinks.solrSink.morphlineFile = /etc/flume-ng/conf/morphline.conf +#agent.sinks.solrSink.morphlineId = morphline1 + +#agent.sinks.loggerSink.type = logger +#agent.sinks.loggerSink.channel = memoryChannel http://git-wip-us.apache.org/repos/asf/flume/blob/0f4a66fb/flume-ng-sources/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/pom.xml b/flume-ng-sources/pom.xml index 6006fa1..0b57d6d 100644 --- a/flume-ng-sources/pom.xml +++ b/flume-ng-sources/pom.xml @@ -43,6 +43,7 @@ limitations under the License. <modules> <module>flume-scribe-source</module> <module>flume-jms-source</module> + <module>flume-twitter-source</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/0f4a66fb/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8b36402..f0fd22e 100644 --- a/pom.xml +++ b/pom.xml @@ -968,6 +968,12 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-twitter-source</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + + <dependency> <groupId>org.apache.flume.flume-ng-legacy-sources</groupId> <artifactId>flume-thrift-source</artifactId> <version>1.5.0-SNAPSHOT</version> @@ -1058,6 +1064,23 @@ limitations under the License. <version>4.3.0</version> </dependency> + <!-- Dependencies of the Twitter source --> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-core</artifactId> + <version>3.0.3</version> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-media-support</artifactId> + <version>3.0.3</version> + </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + <version>3.0.3</version> + </dependency> + </dependencies> </dependencyManagement>
