Repository: incubator-streams-examples Updated Branches: refs/heads/master 76d7d9875 -> 32fd33674
deleting un-finished module, committed inadvertently Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/0d0068c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/0d0068c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/0d0068c4 Branch: refs/heads/master Commit: 0d0068c41e88f5c70270e506f5cd94612224a502 Parents: 76d7d98 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Authored: Sun Dec 18 11:22:18 2016 -0600 Committer: Steve Blackmon @steveblackmon <sblack...@apache.org> Committed: Sun Dec 18 11:22:18 2016 -0600 ---------------------------------------------------------------------- .../src/main/scala/facebook.scala | 161 ---------------- .../src/main/scala/gplus.scala | 82 -------- .../src/main/scala/setup.scala | 27 --- .../src/main/scala/twitter.scala | 191 ------------------- .../src/main/scala/youtube.scala | 120 ------------ 5 files changed, 581 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0d0068c4/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala ---------------------------------------------------------------------- diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala b/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala deleted file mode 100644 index 9a7afcf..0000000 --- a/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -%spark.dep -z.reset() -z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots").snapshot() -z.load("org.apache.streams:streams-provider-facebook:0.4-incubating-SNAPSHOT") - -%spark -import com.typesafe.config._ -import org.apache.streams.config._ -import org.apache.streams.core._ -import org.apache.streams.facebook._ -import org.apache.streams.facebook.graph._ -import java.util.Iterator - -%spark -val credentials = - """ - |facebook { - | oauth { - | appId = "299258633581961" - | appSecret = 03b887d68ee4a3117f9f087330fe8c8f - | } - | userAccessTokens = [ - |EAACEdEose0cBAG4nq7ZB36wwCGv14UToDpZCwXgZA1ZCuShBp1tPQozsbxU75RaOEiJKx75sQgox6wCNgx6rCrEL5K96oNE9EoGutFPBPAEWBZAo7xlgfx715HhAdqdmoaaFTbwJWwruehr1FwIXJr2OAfsxFrqYbPYUkXXojAtSgoEm9WrhW6RRa7os6xBIZD - | ] - |} - |""" -val credentialsConfig = ConfigFactory.parseString(credentials) - -%spark -val accounts = - """ - |facebook { - | ids = [ - | { - | #"id": "Apache-Software-Foundation" - | "id": "108021202551732" - | }, - | { - | #"id": "Apache-Spark" - | "id": "695067547183193" - | }, - | { - | # Apache-Cordova - | "id": "144287225588642" - | }, - | { - | # Apache-HTTP-Server - | "id": "107703115926025" - | }, - | { - | # Apache-Cassandra - | "id": "136080266420061" - | }, - | { - | # Apache-Solr - | "id": "333596995194" - | }, - | { - | # Apache-CXF - | "id": "509899489117171" - | }, - | { - | # Apache-Kafka - | "id": "109576742394607" - | }, - | { - | # Apache-Groovy - | "id": "112510602100049" - | }, - | { - | # Apache-Hadoop - | "id": "102175453157656" - | }, - | { - | # Apache-Hive - | "id": "192818954063511" - | }, - | { - | # Apache-Mahout - | "id": "109528065733066" - | }, - | { - | # Apache-HBase - | "id": "103760282995363" - | } - | ] - |} - |""" -val accountsConfig = ConfigFactory.parseString(accounts) - -%spark -val reference = ConfigFactory.load() -val typesafe = accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve() -val config = new ComponentConfigurator(classOf[FacebookUserInformationConfiguration]).detectConfiguration(typesafe, "facebook"); - -%spark -// Pull info on those accounts -val FacebookPageProvider = new FacebookPageProvider(config); -FacebookPageProvider.prepare(null) -FacebookPageProvider.startStream() -// -val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object] -while(FacebookPageProvider.isRunning()) { - val resultSet = FacebookPageProvider.readCurrent() - resultSet.size() - val iterator = resultSet.iterator(); - while(iterator.hasNext()) { - val datum = iterator.next(); - userdata_buf += datum.getDocument - } -} - -%spark -//Pull activity from those accounts -val FacebookPageFeedProvider = new FacebookPageFeedProvider(config); -FacebookPageFeedProvider.prepare(null) -FacebookPageFeedProvider.startStream() -while(FacebookPageFeedProvider.isRunning()) -// -val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object] -while(FacebookPageFeedProvider.isRunning()) { - val resultSet = FacebookPageFeedProvider.readCurrent() - resultSet.size() - val iterator = resultSet.iterator(); - while(iterator.hasNext()) { - val datum = iterator.next(); - useractivity_buf += datum.getDocument - } -} - -%spark -//Normalize person(s) -> page(s) -val FacebookTypeConverter = new FacebookTypeConverter(classOf[Page], classOf[Page]) -FacebookTypeConverter.prepare() -val userdata_pages = userdata_buf.flatMap(x => FacebookTypeConverter.process(x)) - -%spark -//Normalize activities) -> posts(s) -val FacebookTypeConverter = new FacebookTypeConverter(classOf[Post], classOf[Post]) -FacebookTypeConverter.prepare() -val useractivity_posts = useractivity_buf.flatMap(x => FacebookTypeConverter.process(x)) - - http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0d0068c4/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala ---------------------------------------------------------------------- diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala b/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala deleted file mode 100644 index 971d5f5..0000000 --- a/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -%spark -val reference = ConfigFactory.load() -val typesafe = accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve() -val config = new ComponentConfigurator(classOf[GPlusConfiguration]).detectConfiguration(typesafe, "gplus"); - -%spark -// Pull info on those accounts -val GPlusUserDataProvider = new GPlusUserDataProvider(config); -GPlusUserDataProvider.prepare(null) -GPlusUserDataProvider.startStream() -// -val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object] -while(GPlusUserDataProvider.isRunning()) { - val resultSet = GPlusUserDataProvider.readCurrent() - resultSet.size() - val iterator = resultSet.iterator(); - while(iterator.hasNext()) { - val datum = iterator.next(); - userdata_buf += datum.getDocument - } -} - -%spark -//Pull activity from those accounts -val GPlusUserActivityProvider = new GPlusUserActivityProvider(config); -GPlusUserActivityProvider.prepare(null) -GPlusUserActivityProvider.startStream() -while(GPlusUserActivityProvider.isRunning()) -// -val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object] -while(GPlusUserActivityProvider.isRunning()) { - val resultSet = GPlusUserActivityProvider.readCurrent() - resultSet.size() - val iterator = resultSet.iterator(); - while(iterator.hasNext()) { - val datum = iterator.next(); - useractivity_buf += datum.getDocument - } -} - -%spark -//Normalize person(s) -> page(s) -val GooglePlusTypeConverter = new GooglePlusTypeConverter() -GooglePlusTypeConverter.prepare() -val userdata_pages = userdata_buf.flatMap(x => GooglePlusTypeConverter.process(x)) - - -%spark -import com.google.gson.ExclusionStrategy -import com.google.gson.FieldAttributes -import com.sun.javafx.runtime.async.AbstractAsyncOperation -import sun.jvm.hotspot.runtime.NativeSignatureIterator -class MyExclusionStrategy extends ExclusionStrategy { - def shouldSkipField(f: FieldAttributes) : Boolean { - f.getName().toLowerCase().contains("additionalProperties"); - } -} - -//Normalize activities) -> posts(s) -val GooglePlusTypeConverter = new GooglePlusTypeConverter() -GooglePlusTypeConverter.prepare() -val useractivity_posts = useractivity_buf.flatMap(x => GooglePlusTypeConverter.process(x)) - - http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0d0068c4/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala ---------------------------------------------------------------------- diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala b/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala deleted file mode 100644 index 3f224ff..0000000 --- a/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -%spark.dep -z.reset() -z.load("org.apache.streams:streams-core:0.4-incubating") -z.load("org.apache.streams:streams-converters:0.4-incubating") -z.load("org.apache.streams:streams-pojo:0.4-incubating") -z.load("org.apache.streams:streams-provider-twitter:0.4-incubating") -z.load("org.apache.streams:streams-provider-facebook:0.4-incubating") -z.load("org.apache.streams:streams-provider-youtube:0.4-incubating") -z.load("org.apache.streams:google-gplus:0.4-incubating") http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0d0068c4/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala ---------------------------------------------------------------------- diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala b/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala deleted file mode 100644 index 0911102..0000000 --- a/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -%spark.dep -z.reset() -z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots").snapshot() -z.load("org.apache.streams:streams-provider-twitter:0.4-incubating-SNAPSHOT") - -%spark -import com.typesafe.config._ -import org.apache.streams.config._ -import org.apache.streams.core._ -import java.util.Iterator -import org.apache.streams.twitter.TwitterUserInformationConfiguration -import org.apache.streams.twitter.pojo._ -import org.apache.streams.twitter.provider._ - -%spark -val consumerKey = z.input("ConsumerKey", "jF3awfLECUZ4tFAwS5bZcha8c") -val consumerSecret = z.input("ConsumerSecret", "0IjoS5aPE88kNSREK6HNzAhUcJMziSlaT1fOkA5pzpusZLrhCj") -val accessToken = z.input("AccessToken", "42232950-CzaYlt2M6SPGI883B5NZ8vROcL4qUsTJlp9wIM2K2") -val accessTokenSecret = z.input("AccessTokenSecret", "vviQzladFUl23hdVelEiIknSLoHfAs40DqTv3RdXHhmz0") - -%spark -val credentials_hocon = s""" - twitter { - oauth { - consumerKey = "$consumerKey" - consumerSecret = "$consumerSecret" - accessToken = "$accessToken" - accessTokenSecret = "$accessTokenSecret" - } - retrySleepMs = 5000 - retryMax = 250 - } -""" - -%spark -val accounts_hocon = s""" -twitter.info = [ -# "ApacheSpark" - 1551361069 -# "ApacheFlink" - 2493948216 -# "ApacheKafka" - 1287555762 -# "Hadoop" - 175671289 -# "ApacheCassandra" - 19041500 -# "ApacheSolr" - 22742048 -# "ApacheMahout" - 1123330975 -# "ApacheHive" - 1188044936 -# "ApacheHbase" - 2912997159 -] -""" - -%spark -val reference = ConfigFactory.load() -val credentials = ConfigFactory.parseString(credentials_hocon) -val accounts = ConfigFactory.parseString(accounts_hocon) -val typesafe = accounts.withFallback(credentials).withFallback(reference).resolve() -val twitterUserInformationConfiguration = new ComponentConfigurator(classOf[TwitterUserInformationConfiguration]).detectConfiguration(typesafe, "twitter"); - -%spark -val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object] - -val twitterUserInformationProvider = new TwitterUserInformationProvider(twitterUserInformationConfiguration); -twitterUserInformationProvider.prepare() -twitterUserInformationProvider.startStream() -while(twitterUserInformationProvider.isRunning()) { - val resultSet = twitterUserInformationProvider.readCurrent() - resultSet.size() - val iterator = resultSet.iterator(); - while(iterator.hasNext()) { - val datum = iterator.next(); - //println(datum.getDocument) - userdata_buf += datum.getDocument - } -} -userdata_buf.size - -%spark -import com.typesafe.config._ -import org.apache.streams.config._ -import org.apache.streams.core._ -import java.util.Iterator -import org.apache.streams.twitter.TwitterUserInformationConfiguration - -import org.apache.streams.twitter.pojo._ -import org.apache.streams.twitter.provider._ - -val timeline_buf = scala.collection.mutable.ArrayBuffer.empty[Object] - -val twitterTimelineProvider = new TwitterTimelineProvider(twitterUserInformationConfiguration); -twitterTimelineProvider.prepare(twitterUserInformationConfiguration) -twitterTimelineProvider.startStream() -while(twitterTimelineProvider.isRunning()) { - val resultSet = twitterTimelineProvider.readCurrent() - resultSet.size() - val iterator = resultSet.iterator(); - while(iterator.hasNext()) { - val datum = iterator.next(); - //println(datum.getDocument) - timeline_buf += datum.getDocument - } -} -timeline_buf.size - -%spark -import org.apache.streams.converter.ActivityObjectConverterProcessor -import org.apache.streams.core.StreamsProcessor -import org.apache.streams.pojo.json.ActivityObject -import scala.collection.JavaConverters -import scala.collection.JavaConversions._ - -val converter = new ActivityObjectConverterProcessor() -converter.prepare() - -val user_datums = userdata_buf.map(x => new StreamsDatum(x)) -val actor_datums = user_datums.flatMap(x => converter.process(x)) -val pages = actor_datums.map(x => x.getDocument.asInstanceOf[ActivityObject]) - -%spark -import org.apache.streams.jackson.StreamsJacksonMapper; -import sqlContext._ -import sqlContext.implicits._ - -val mapper = StreamsJacksonMapper.getInstance(); -val pages_jsons = pages.map(o => mapper.writeValueAsString(o)) -val pagesRDD = sc.parallelize(pages_jsons) - -val pagesDF = sqlContext.read.json(pagesRDD) - -val pagescleanDF = pagesDF.withColumn("summary", removePunctuationAndSpecialChar(pagesDF("summary"))) -pagescleanDF.registerTempTable("twitter_pages") -pagescleanDF.printSchema - -%spark -import org.apache.streams.converter.ActivityConverterProcessor -import org.apache.streams.core.StreamsProcessor -import org.apache.streams.pojo.json.Activity -import scala.collection.JavaConverters -import scala.collection.JavaConversions._ - -val converter = new ActivityConverterProcessor() -converter.prepare() - -val status_datums = timeline_buf.map(x => new StreamsDatum(x)) -val activity_datums = status_datums.flatMap(x => converter.process(x)).map(x => x.getDocument.asInstanceOf[Activity]) -activity_datums.size - -%spark -import org.apache.streams.jackson.StreamsJacksonMapper; -import sqlContext._ -import sqlContext.implicits._ - -val mapper = StreamsJacksonMapper.getInstance(); -val jsons = activity_datums.map(o => mapper.writeValueAsString(o)) -val activitiesRDD = sc.parallelize(jsons) - -val activitiesDF = sqlContext.read.json(activitiesRDD) - -val cleanDF = activitiesDF.withColumn("content", removePunctuationAndSpecialChar(activitiesDF("content"))) -cleanDF.registerTempTable("twitter_posts") -cleanDF.printSchema - -%spark.sql -select id, displayName, handle, summary, extensions.favorites, extensions.followers, extensions.posts from twitter_pages - -%spark.sql -select id, actor.id, content, published, rebroadcasts.count from twitter_posts http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0d0068c4/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala ---------------------------------------------------------------------- diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala b/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala deleted file mode 100644 index e988752..0000000 --- a/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -%spark.dep -z.reset() -z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots").snapshot() -z.load("org.apache.streams:streams-provider-youtube:0.4-incubating-SNAPSHOT") - -%spark -import com.typesafe.config._ -import org.apache.streams.config._ -import org.apache.streams.core._ -import com.youtube.provider._ -import org.apache.youtube.pojo._ -import java.util.Iterator - -%spark -val credentials = - """ - |youtube { - | apiKey = 79d9f9ca2796d1ec5334faf8d6efaa6456a297e6 - | oauth { - | serviceAccountEmailAddress = "streams...@adroit-particle-764.iam.gserviceaccount.com" - | pathToP12KeyFile = streams-c84fa47bd759.p12 - | } - |} - |""" -val credentialsConfig = ConfigFactory.parseString(credentials) - -%spark -val accounts = - """ - |youtube { - | youtubeUsers = [ - | { - | userId = "UCLDJ_V9KUOdOFSbDvPfGBxw" - | } - | ] - |} - |""" -val accountsConfig = ConfigFactory.parseString(accounts) - -%spark -val reference = ConfigFactory.load() -val typesafe = accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve() -val config = new ComponentConfigurator(classOf[YoutubeConfiguration]).detectConfiguration(typesafe, "youtube"); - -%spark -// Pull info on those channels -val YoutubeChannelProvider = new YoutubeChannelProvider(config); -YoutubeChannelProvider.prepare(null) -YoutubeChannelProvider.startStream() -// -val channel_buf = scala.collection.mutable.ArrayBuffer.empty[Object] -while(YoutubeChannelProvider.isRunning()) { - val resultSet = YoutubeChannelProvider.readCurrent() - resultSet.size() - val iterator = resultSet.iterator(); - while(iterator.hasNext()) { - val datum = iterator.next(); - channel_buf += datum.getDocument - } -} - -%spark -//Pull activity from those accounts -val YoutubeUserActivityProvider = new YoutubeUserActivityProvider(config); -YoutubeUserActivityProvider.prepare(null) -YoutubeUserActivityProvider.startStream() -while(YoutubeUserActivityProvider.isRunning()) -// -val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object] -while(YoutubeUserActivityProvider.isRunning()) { - val resultSet = YoutubeUserActivityProvider.readCurrent() - resultSet.size() - val iterator = resultSet.iterator(); - while(iterator.hasNext()) { - val datum = iterator.next(); - useractivity_buf += datum.getDocument - } -} - -%spark -import org.apache.streams.core.StreamsDatum -import com.youtube.processor._ -import scala.collection.JavaConversions._ -//Normalize activities -> posts(s) -val YoutubeTypeConverter = new YoutubeTypeConverter() -YoutubeTypeConverter.prepare() -val useractivity_posts = useractivity_buf.flatMap(x => YoutubeTypeConverter.process(x)) - -%spark -import org.apache.streams.jackson.StreamsJacksonMapper; - -val sqlContext = new org.apache.spark.sql.SQLContext(sc) - -val mapper = StreamsJacksonMapper.getInstance(); -val activitiesRDD = sc.parallelize(useractivity_posts.map(o => mapper.writeValueAsString(o))) - -val activitiesDF = sqlContext.read.json(activitiesRDD) - -activitiesDF.registerTempTable("activities") - -%spark.sql -select count(id) from activitiesDF