[streaming] Twitter connector prototype

Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b3cd5fd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b3cd5fd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b3cd5fd0

Branch: refs/heads/master
Commit: b3cd5fd0262dec5277743d1e3f80548bc104dd8e
Parents: ee7c4a8
Author: Eszes Dávid <[email protected]>
Authored: Fri Aug 1 11:54:05 2014 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Aug 18 16:22:11 2014 +0200

----------------------------------------------------------------------
 .../connectors/twitter/TwitterLocal.java        | 106 ++++++++
 .../connectors/twitter/TwitterSource.java       | 243 +++++++++++++++++++
 .../connectors/twitter/TwitterStreaming.java    | 107 ++++++++
 3 files changed, 456 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
new file mode 100644
index 0000000..138fe05
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
+import org.apache.flink.streaming.examples.wordcount.WordCountCounter;
+import org.apache.flink.util.Collector;
+
+/**
+ * This program demonstrate the use of TwitterSource. 
+ * Its aim is to count the frequency of the languages of tweets
+ */
+public class TwitterLocal {
+
+       private static final int PARALLELISM = 1;
+       private static final int SOURCE_PARALLELISM = 1;
+
+       /**
+        * FlatMapFunction to determine the language of tweets if possible 
+        */
+       public static class SelectLanguageFlatMap extends
+                       JSONParseFlatMap<Tuple1<String>, Tuple1<String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               /**
+                * Select the language from the incoming JSON text
+                */
+               @Override
+               public void flatMap(Tuple1<String> value, 
Collector<Tuple1<String>> out) throws Exception {
+
+                       out.collect(new 
Tuple1<String>(colationOfNull(getField(value.f0, "lang"))));
+               }
+
+               /**
+                * Change the null String to space character. Useful when null 
is undesirable.
+                * @param in
+                * @return
+                */
+               protected String colationOfNull(String in) {
+                       if (in == null) {
+                               return " ";
+                       }
+                       return in;
+               }
+       }
+
+       public static void main(String[] args) {
+
+               String path = new String();
+
+               if (args != null && args.length == 1) {
+                       path = args[0];
+               } else {
+                       System.err.println("USAGE:\n haho TwitterLocal itt 
<pathToPropertiesFile>");
+                       return;
+               }
+
+               StreamExecutionEnvironment env = StreamExecutionEnvironment
+                               .createLocalEnvironment(PARALLELISM);
+
+               DataStream<Tuple1<String>> streamSource = env.addSource(new 
TwitterSource(path, 100),
+                               SOURCE_PARALLELISM);
+
+
+               DataStream<Tuple2<String, Integer>> dataStream = streamSource
+                               .flatMap(new SelectLanguageFlatMap())
+                               .partitionBy(0)
+                               .map(new WordCountCounter());
+
+               dataStream.addSink(new SinkFunction<Tuple2<String, Integer>>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void invoke(Tuple2<String, Integer> tuple) {
+                               System.out.println(tuple);
+
+                       }
+               });
+
+               env.execute();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
new file mode 100644
index 0000000..bbff732
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -0,0 +1,243 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to emit tweets from 
Twitter.
+ * It can connect to Twitter Streaming API, collect tweets and 
+ */
+public class TwitterSource extends SourceFunction<Tuple1<String>> {
+
+       private static final Log LOG = LogFactory.getLog(DataStream.class);
+
+       private static final long serialVersionUID = 1L;
+       private String authPath;
+       private transient BlockingQueue<String> queue;
+       private int queueSize = 10000;
+       private transient BasicClient client;
+       private int waitSec = 5;
+
+       private boolean streaming;
+       private int numberOfTweets;
+
+       /**
+        * Create {@link TwitterSource} for streaming
+        * @param authPath
+        * Location of the properties file containing the required 
authentication information. 
+        */
+       public TwitterSource(String authPath) {
+               this.authPath = authPath;
+               streaming = true;
+       }
+
+       /**
+        * Create {@link TwitterSource} to 
+        * collect finite number of tweets
+        * @param authPath
+        * Location of the properties file containing the required 
authentication information. 
+        * @param numberOfTweets
+        * 
+        */
+       public TwitterSource(String authPath, int numberOfTweets) {
+               this.authPath = authPath;
+               streaming = false;
+               this.numberOfTweets = numberOfTweets;
+       }
+
+       /**
+        * 
+        */
+       @Override
+       public void invoke(Collector<Tuple1<String>> collector) throws 
Exception {
+
+               initializeConnection();
+
+               
+               if (streaming) {
+                       collectMessages(collector);
+               } else {
+                       collectMessages(collector, numberOfTweets);
+               }
+
+               closeConnection();
+       }
+
+       /**
+        * Initialize Hosebird Client to be able to consume Twitter's Streaming 
API
+        */
+       private void initializeConnection() {
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Initializing Twitter Streaming API 
connection");
+               }
+
+               queue = new LinkedBlockingQueue<String>(queueSize);
+
+               StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+               endpoint.stallWarnings(false);
+
+               Authentication auth = authenticate();
+
+               initializeClient(endpoint, auth);
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Twitter Streaming API connection established 
successfully");
+               }
+       }
+
+       private OAuth1 authenticate() {
+
+               Properties authenticationProperties = 
loadAuthenticationProperties();
+
+               return new 
OAuth1(authenticationProperties.getProperty("consumerKey"),
+                               
authenticationProperties.getProperty("consumerSecret"),
+                               authenticationProperties.getProperty("token"),
+                               authenticationProperties.getProperty("secret"));
+       }
+
+       /**
+        * Reads the given properties file for the authentication data.   
+        * @return
+        * the authentication data.
+        */
+       private Properties loadAuthenticationProperties() {
+               Properties properties = new Properties();
+               try {
+                       InputStream input = new FileInputStream(authPath);
+                       properties.load(input);
+                       input.close();
+               } catch (IOException ioe) {
+                       new RuntimeException("Cannot open .properties file: " + 
authPath,
+                                       ioe);
+               }
+               return properties;
+       }
+
+       private void initializeClient(StatusesSampleEndpoint endpoint,
+                       Authentication auth) {
+
+               client = new ClientBuilder().name("twitterSourceClient")
+                               .hosts(Constants.STREAM_HOST).endpoint(endpoint)
+                               .authentication(auth)
+                               .processor(new 
StringDelimitedProcessor(queue)).build();
+
+               client.connect();
+       }
+
+       /**
+        * Put tweets into collector
+        * @param collector
+        * @param piece
+        */
+       protected void collectMessages(Collector<Tuple1<String>> collector, int 
piece) {
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Collecting tweets");
+               }
+
+               for (int i = 0; i < piece; i++) {
+                       collectOneMessage(collector);
+               }
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Collecting tweets finished");
+               }
+       }
+
+       /**
+        * Put tweets into collector
+        * @param collector
+        * 
+        */
+       protected void collectMessages(Collector<Tuple1<String>> collector) {
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Tweet-stream begins");
+               }
+
+               while (true) {
+                       collectOneMessage(collector);
+               }
+       }
+
+       /**
+        * Put one tweet into the collector.
+        * @param collector
+        */
+       protected void collectOneMessage(Collector<Tuple1<String>> collector) {
+               if (client.isDone()) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Client connection closed 
unexpectedly: "
+                                               + 
client.getExitEvent().getMessage());
+                       }
+               }
+
+               try {
+                       String msg = queue.poll(waitSec, TimeUnit.SECONDS);
+                       if (msg != null) {
+                               collector.collect(new Tuple1<String>(msg));
+                       } else {
+                               if (LOG.isInfoEnabled()) {
+                                       LOG.info("Did not receive a message in 
" + waitSec
+                                                       + " seconds");
+                               }
+                       }
+               } catch (InterruptedException e) {
+                       new RuntimeException("'Waiting for tweet' thread is 
interrupted", e);
+               }
+
+       }
+
+       private void closeConnection() {
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Initiating connection close");
+               }
+
+               client.stop();
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Connection closed successfully");
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
new file mode 100644
index 0000000..805bf06
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+
+public class TwitterStreaming {
+
+       private static final int PARALLELISM = 1;
+       private static final int SOURCE_PARALLELISM = 1;
+
+       public static class TwitterSink extends SinkFunction<Tuple5<Long, Long, 
String, String, String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void invoke(Tuple5<Long, Long, String, String, String> 
tuple) {
+                       System.out.println(tuple.f0 + " " + tuple.f1 + " " + 
tuple.f4);
+                       System.out.println("NAME: " + tuple.f2);
+                       System.out.println(tuple.f3);
+                       System.out.println(" ");
+               }
+
+       }
+       
+       public static class SelectDataFlatMap extends
+                       JSONParseFlatMap<Tuple1<String>, Tuple5<Long, Long, 
String, String, String>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(Tuple1<String> value,
+                               Collector<Tuple5<Long, Long, String, String, 
String>> out)
+                               throws Exception {
+
+                       out.collect(new Tuple5<Long, Long, String, String, 
String>(
+                                       
convertDateString2Long(getField(value.f0, "id")),
+                                       
convertDateString2LongDate(getField(value.f0, "created_at")),
+                                       colationOfNull(getField(value.f0, 
"user.name")),
+                                       colationOfNull(getField(value.f0, 
"text")),
+                                       getField(value.f0, "lang")));
+               }
+               
+               protected String colationOfNull(String in){
+                       if(in==null){
+                               return " ";
+                       }
+                       return in;
+               }
+               
+               protected Long convertDateString2LongDate(String dateString) {
+                       if (dateString!=(null)) {
+                               String[] dateArray = dateString.split(" ");
+                               return 
Long.parseLong(dateArray[2])*100000+Long.parseLong(dateArray[5]);
+                       }
+                       return 0L;
+               }
+               
+               protected Long convertDateString2Long(String dateString) {
+                       if (dateString != null) {
+                               return Long.parseLong(dateString);
+                       }
+                       return 0L;
+               }
+       }
+
+       public static void main(String[] args) {
+               
+               String path = "/home/eszes/git/auth.properties";
+
+               StreamExecutionEnvironment env = StreamExecutionEnvironment
+                               .createLocalEnvironment(PARALLELISM);
+
+               DataStream<Tuple1<String>> streamSource = env.addSource(
+                               new TwitterSource(path,100), 
SOURCE_PARALLELISM);
+
+               DataStream<Tuple5<Long, Long, String, String, String>> 
selectedDataStream = streamSource
+                               .flatMap(new SelectDataFlatMap());
+
+               selectedDataStream.addSink(new TwitterSink());
+
+               env.execute();
+       }
+}

Reply via email to