[streaming] Twitter connector prototype
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b3cd5fd0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b3cd5fd0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b3cd5fd0 Branch: refs/heads/master Commit: b3cd5fd0262dec5277743d1e3f80548bc104dd8e Parents: ee7c4a8 Author: Eszes Dávid <[email protected]> Authored: Fri Aug 1 11:54:05 2014 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Aug 18 16:22:11 2014 +0200 ---------------------------------------------------------------------- .../connectors/twitter/TwitterLocal.java | 106 ++++++++ .../connectors/twitter/TwitterSource.java | 243 +++++++++++++++++++ .../connectors/twitter/TwitterStreaming.java | 107 ++++++++ 3 files changed, 456 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java new file mode 100644 index 0000000..138fe05 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java @@ -0,0 +1,106 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.connectors.twitter; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.DataStream; +import org.apache.flink.streaming.api.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.examples.function.JSONParseFlatMap; +import org.apache.flink.streaming.examples.wordcount.WordCountCounter; +import org.apache.flink.util.Collector; + +/** + * This program demonstrate the use of TwitterSource. + * Its aim is to count the frequency of the languages of tweets + */ +public class TwitterLocal { + + private static final int PARALLELISM = 1; + private static final int SOURCE_PARALLELISM = 1; + + /** + * FlatMapFunction to determine the language of tweets if possible + */ + public static class SelectLanguageFlatMap extends + JSONParseFlatMap<Tuple1<String>, Tuple1<String>> { + + private static final long serialVersionUID = 1L; + + /** + * Select the language from the incoming JSON text + */ + @Override + public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception { + + out.collect(new Tuple1<String>(colationOfNull(getField(value.f0, "lang")))); + } + + /** + * Change the null String to space character. Useful when null is undesirable. + * @param in + * @return + */ + protected String colationOfNull(String in) { + if (in == null) { + return " "; + } + return in; + } + } + + public static void main(String[] args) { + + String path = new String(); + + if (args != null && args.length == 1) { + path = args[0]; + } else { + System.err.println("USAGE:\n haho TwitterLocal itt <pathToPropertiesFile>"); + return; + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment + .createLocalEnvironment(PARALLELISM); + + DataStream<Tuple1<String>> streamSource = env.addSource(new TwitterSource(path, 100), + SOURCE_PARALLELISM); + + + DataStream<Tuple2<String, Integer>> dataStream = streamSource + .flatMap(new SelectLanguageFlatMap()) + .partitionBy(0) + .map(new WordCountCounter()); + + dataStream.addSink(new SinkFunction<Tuple2<String, Integer>>() { + + private static final long serialVersionUID = 1L; + + @Override + public void invoke(Tuple2<String, Integer> tuple) { + System.out.println(tuple); + + } + }); + + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java new file mode 100644 index 0000000..bbff732 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java @@ -0,0 +1,243 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.connectors.twitter; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.streaming.api.DataStream; +import org.apache.flink.streaming.api.function.source.SourceFunction; +import org.apache.flink.util.Collector; + +import com.twitter.hbc.ClientBuilder; +import com.twitter.hbc.core.Constants; +import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; +import com.twitter.hbc.core.processor.StringDelimitedProcessor; +import com.twitter.hbc.httpclient.BasicClient; +import com.twitter.hbc.httpclient.auth.Authentication; +import com.twitter.hbc.httpclient.auth.OAuth1; + +/** + * Implementation of {@link SourceFunction} specialized to emit tweets from Twitter. + * It can connect to Twitter Streaming API, collect tweets and + */ +public class TwitterSource extends SourceFunction<Tuple1<String>> { + + private static final Log LOG = LogFactory.getLog(DataStream.class); + + private static final long serialVersionUID = 1L; + private String authPath; + private transient BlockingQueue<String> queue; + private int queueSize = 10000; + private transient BasicClient client; + private int waitSec = 5; + + private boolean streaming; + private int numberOfTweets; + + /** + * Create {@link TwitterSource} for streaming + * @param authPath + * Location of the properties file containing the required authentication information. + */ + public TwitterSource(String authPath) { + this.authPath = authPath; + streaming = true; + } + + /** + * Create {@link TwitterSource} to + * collect finite number of tweets + * @param authPath + * Location of the properties file containing the required authentication information. + * @param numberOfTweets + * + */ + public TwitterSource(String authPath, int numberOfTweets) { + this.authPath = authPath; + streaming = false; + this.numberOfTweets = numberOfTweets; + } + + /** + * + */ + @Override + public void invoke(Collector<Tuple1<String>> collector) throws Exception { + + initializeConnection(); + + + if (streaming) { + collectMessages(collector); + } else { + collectMessages(collector, numberOfTweets); + } + + closeConnection(); + } + + /** + * Initialize Hosebird Client to be able to consume Twitter's Streaming API + */ + private void initializeConnection() { + + if (LOG.isInfoEnabled()) { + LOG.info("Initializing Twitter Streaming API connection"); + } + + queue = new LinkedBlockingQueue<String>(queueSize); + + StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); + endpoint.stallWarnings(false); + + Authentication auth = authenticate(); + + initializeClient(endpoint, auth); + + if (LOG.isInfoEnabled()) { + LOG.info("Twitter Streaming API connection established successfully"); + } + } + + private OAuth1 authenticate() { + + Properties authenticationProperties = loadAuthenticationProperties(); + + return new OAuth1(authenticationProperties.getProperty("consumerKey"), + authenticationProperties.getProperty("consumerSecret"), + authenticationProperties.getProperty("token"), + authenticationProperties.getProperty("secret")); + } + + /** + * Reads the given properties file for the authentication data. + * @return + * the authentication data. + */ + private Properties loadAuthenticationProperties() { + Properties properties = new Properties(); + try { + InputStream input = new FileInputStream(authPath); + properties.load(input); + input.close(); + } catch (IOException ioe) { + new RuntimeException("Cannot open .properties file: " + authPath, + ioe); + } + return properties; + } + + private void initializeClient(StatusesSampleEndpoint endpoint, + Authentication auth) { + + client = new ClientBuilder().name("twitterSourceClient") + .hosts(Constants.STREAM_HOST).endpoint(endpoint) + .authentication(auth) + .processor(new StringDelimitedProcessor(queue)).build(); + + client.connect(); + } + + /** + * Put tweets into collector + * @param collector + * @param piece + */ + protected void collectMessages(Collector<Tuple1<String>> collector, int piece) { + + if (LOG.isInfoEnabled()) { + LOG.info("Collecting tweets"); + } + + for (int i = 0; i < piece; i++) { + collectOneMessage(collector); + } + + if (LOG.isInfoEnabled()) { + LOG.info("Collecting tweets finished"); + } + } + + /** + * Put tweets into collector + * @param collector + * + */ + protected void collectMessages(Collector<Tuple1<String>> collector) { + + if (LOG.isInfoEnabled()) { + LOG.info("Tweet-stream begins"); + } + + while (true) { + collectOneMessage(collector); + } + } + + /** + * Put one tweet into the collector. + * @param collector + */ + protected void collectOneMessage(Collector<Tuple1<String>> collector) { + if (client.isDone()) { + if (LOG.isErrorEnabled()) { + LOG.error("Client connection closed unexpectedly: " + + client.getExitEvent().getMessage()); + } + } + + try { + String msg = queue.poll(waitSec, TimeUnit.SECONDS); + if (msg != null) { + collector.collect(new Tuple1<String>(msg)); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Did not receive a message in " + waitSec + + " seconds"); + } + } + } catch (InterruptedException e) { + new RuntimeException("'Waiting for tweet' thread is interrupted", e); + } + + } + + private void closeConnection() { + + if (LOG.isInfoEnabled()) { + LOG.info("Initiating connection close"); + } + + client.stop(); + + if (LOG.isInfoEnabled()) { + LOG.info("Connection closed successfully"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java new file mode 100644 index 0000000..805bf06 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java @@ -0,0 +1,107 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.streaming.connectors.twitter; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.streaming.api.DataStream; +import org.apache.flink.streaming.api.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.examples.function.JSONParseFlatMap; +import org.apache.flink.util.Collector; + +public class TwitterStreaming { + + private static final int PARALLELISM = 1; + private static final int SOURCE_PARALLELISM = 1; + + public static class TwitterSink extends SinkFunction<Tuple5<Long, Long, String, String, String>> { + + private static final long serialVersionUID = 1L; + + @Override + public void invoke(Tuple5<Long, Long, String, String, String> tuple) { + System.out.println(tuple.f0 + " " + tuple.f1 + " " + tuple.f4); + System.out.println("NAME: " + tuple.f2); + System.out.println(tuple.f3); + System.out.println(" "); + } + + } + + public static class SelectDataFlatMap extends + JSONParseFlatMap<Tuple1<String>, Tuple5<Long, Long, String, String, String>> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Tuple1<String> value, + Collector<Tuple5<Long, Long, String, String, String>> out) + throws Exception { + + out.collect(new Tuple5<Long, Long, String, String, String>( + convertDateString2Long(getField(value.f0, "id")), + convertDateString2LongDate(getField(value.f0, "created_at")), + colationOfNull(getField(value.f0, "user.name")), + colationOfNull(getField(value.f0, "text")), + getField(value.f0, "lang"))); + } + + protected String colationOfNull(String in){ + if(in==null){ + return " "; + } + return in; + } + + protected Long convertDateString2LongDate(String dateString) { + if (dateString!=(null)) { + String[] dateArray = dateString.split(" "); + return Long.parseLong(dateArray[2])*100000+Long.parseLong(dateArray[5]); + } + return 0L; + } + + protected Long convertDateString2Long(String dateString) { + if (dateString != null) { + return Long.parseLong(dateString); + } + return 0L; + } + } + + public static void main(String[] args) { + + String path = "/home/eszes/git/auth.properties"; + + StreamExecutionEnvironment env = StreamExecutionEnvironment + .createLocalEnvironment(PARALLELISM); + + DataStream<Tuple1<String>> streamSource = env.addSource( + new TwitterSource(path,100), SOURCE_PARALLELISM); + + DataStream<Tuple5<Long, Long, String, String, String>> selectedDataStream = streamSource + .flatMap(new SelectDataFlatMap()); + + selectedDataStream.addSink(new TwitterSink()); + + env.execute(); + } +}
