Repository: incubator-streams-examples
Updated Branches:
  refs/heads/master 76d7d9875 -> 32fd33674


deleting un-finished module, committed inadvertently


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/0d0068c4
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/0d0068c4
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/0d0068c4

Branch: refs/heads/master
Commit: 0d0068c41e88f5c70270e506f5cd94612224a502
Parents: 76d7d98
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Authored: Sun Dec 18 11:22:18 2016 -0600
Committer: Steve Blackmon @steveblackmon <sblack...@apache.org>
Committed: Sun Dec 18 11:22:18 2016 -0600

----------------------------------------------------------------------
 .../src/main/scala/facebook.scala               | 161 ----------------
 .../src/main/scala/gplus.scala                  |  82 --------
 .../src/main/scala/setup.scala                  |  27 ---
 .../src/main/scala/twitter.scala                | 191 -------------------
 .../src/main/scala/youtube.scala                | 120 ------------
 5 files changed, 581 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0d0068c4/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala
----------------------------------------------------------------------
diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala 
b/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala
deleted file mode 100644
index 9a7afcf..0000000
--- a/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-%spark.dep
-z.reset()
-z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots";).snapshot()
-z.load("org.apache.streams:streams-provider-facebook:0.4-incubating-SNAPSHOT")
-
-%spark
-import com.typesafe.config._
-import org.apache.streams.config._
-import org.apache.streams.core._
-import org.apache.streams.facebook._
-import org.apache.streams.facebook.graph._
-import java.util.Iterator
-
-%spark
-val credentials =
-  """
-    |facebook {
-    |  oauth {
-    |    appId = "299258633581961"
-    |    appSecret = 03b887d68ee4a3117f9f087330fe8c8f
-    |  }
-    |  userAccessTokens = [
-    
|EAACEdEose0cBAG4nq7ZB36wwCGv14UToDpZCwXgZA1ZCuShBp1tPQozsbxU75RaOEiJKx75sQgox6wCNgx6rCrEL5K96oNE9EoGutFPBPAEWBZAo7xlgfx715HhAdqdmoaaFTbwJWwruehr1FwIXJr2OAfsxFrqYbPYUkXXojAtSgoEm9WrhW6RRa7os6xBIZD
-    |  ]
-    |}
-    |"""
-val credentialsConfig = ConfigFactory.parseString(credentials)
-
-%spark
-val accounts =
-  """
-    |facebook {
-    |  ids = [
-    |    {
-    |      #"id": "Apache-Software-Foundation"
-    |      "id": "108021202551732"
-    |    },
-    |    {
-    |      #"id": "Apache-Spark"
-    |      "id": "695067547183193"
-    |    },
-    |    {
-    |      # Apache-Cordova
-    |      "id": "144287225588642"
-    |    },
-    |    {
-    |      # Apache-HTTP-Server
-    |      "id": "107703115926025"
-    |    },
-    |    {
-    |      # Apache-Cassandra
-    |      "id": "136080266420061"
-    |    },
-    |    {
-    |      # Apache-Solr
-    |      "id": "333596995194"
-    |    },
-    |    {
-    |      # Apache-CXF
-    |      "id": "509899489117171"
-    |    },
-    |    {
-    |      # Apache-Kafka
-    |      "id": "109576742394607"
-    |    },
-    |    {
-    |      # Apache-Groovy
-    |      "id": "112510602100049"
-    |    },
-    |    {
-    |      # Apache-Hadoop
-    |      "id": "102175453157656"
-    |    },
-    |    {
-    |      # Apache-Hive
-    |      "id": "192818954063511"
-    |    },
-    |    {
-    |      # Apache-Mahout
-    |      "id": "109528065733066"
-    |    },
-    |    {
-    |      # Apache-HBase
-    |      "id": "103760282995363"
-    |    }
-    |  ]
-    |}
-    |"""
-val accountsConfig = ConfigFactory.parseString(accounts)
-
-%spark
-val reference = ConfigFactory.load()
-val typesafe = 
accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve()
-val config = new 
ComponentConfigurator(classOf[FacebookUserInformationConfiguration]).detectConfiguration(typesafe,
 "facebook");
-
-%spark
-// Pull info on those accounts
-val FacebookPageProvider = new FacebookPageProvider(config);
-FacebookPageProvider.prepare(null)
-FacebookPageProvider.startStream()
-//
-val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
-while(FacebookPageProvider.isRunning()) {
-  val resultSet = FacebookPageProvider.readCurrent()
-  resultSet.size()
-  val iterator = resultSet.iterator();
-  while(iterator.hasNext()) {
-    val datum = iterator.next();
-    userdata_buf += datum.getDocument
-  }
-}
-
-%spark
-//Pull activity from those accounts
-val FacebookPageFeedProvider = new FacebookPageFeedProvider(config);
-FacebookPageFeedProvider.prepare(null)
-FacebookPageFeedProvider.startStream()
-while(FacebookPageFeedProvider.isRunning())
-//
-val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
-while(FacebookPageFeedProvider.isRunning()) {
-  val resultSet = FacebookPageFeedProvider.readCurrent()
-  resultSet.size()
-  val iterator = resultSet.iterator();
-  while(iterator.hasNext()) {
-    val datum = iterator.next();
-    useractivity_buf += datum.getDocument
-  }
-}
-
-%spark
-//Normalize person(s) -> page(s)
-val FacebookTypeConverter = new FacebookTypeConverter(classOf[Page], 
classOf[Page])
-FacebookTypeConverter.prepare()
-val userdata_pages = userdata_buf.flatMap(x => 
FacebookTypeConverter.process(x))
-
-%spark
-//Normalize activities) -> posts(s)
-val FacebookTypeConverter = new FacebookTypeConverter(classOf[Post], 
classOf[Post])
-FacebookTypeConverter.prepare()
-val useractivity_posts = useractivity_buf.flatMap(x => 
FacebookTypeConverter.process(x))
-
-

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0d0068c4/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala
----------------------------------------------------------------------
diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala 
b/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala
deleted file mode 100644
index 971d5f5..0000000
--- a/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-%spark
-val reference = ConfigFactory.load()
-val typesafe = 
accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve()
-val config = new 
ComponentConfigurator(classOf[GPlusConfiguration]).detectConfiguration(typesafe,
 "gplus");
-
-%spark
-// Pull info on those accounts
-val GPlusUserDataProvider = new GPlusUserDataProvider(config);
-GPlusUserDataProvider.prepare(null)
-GPlusUserDataProvider.startStream()
-//
-val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
-while(GPlusUserDataProvider.isRunning()) {
-  val resultSet = GPlusUserDataProvider.readCurrent()
-  resultSet.size()
-  val iterator = resultSet.iterator();
-  while(iterator.hasNext()) {
-    val datum = iterator.next();
-    userdata_buf += datum.getDocument
-  }
-}
-
-%spark
-//Pull activity from those accounts
-val GPlusUserActivityProvider = new GPlusUserActivityProvider(config);
-GPlusUserActivityProvider.prepare(null)
-GPlusUserActivityProvider.startStream()
-while(GPlusUserActivityProvider.isRunning())
-//
-val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
-while(GPlusUserActivityProvider.isRunning()) {
-  val resultSet = GPlusUserActivityProvider.readCurrent()
-  resultSet.size()
-  val iterator = resultSet.iterator();
-  while(iterator.hasNext()) {
-    val datum = iterator.next();
-    useractivity_buf += datum.getDocument
-  }
-}
-
-%spark
-//Normalize person(s) -> page(s)
-val GooglePlusTypeConverter = new GooglePlusTypeConverter()
-GooglePlusTypeConverter.prepare()
-val userdata_pages = userdata_buf.flatMap(x => 
GooglePlusTypeConverter.process(x))
-
-
-%spark
-import com.google.gson.ExclusionStrategy
-import com.google.gson.FieldAttributes
-import com.sun.javafx.runtime.async.AbstractAsyncOperation
-import sun.jvm.hotspot.runtime.NativeSignatureIterator
-class MyExclusionStrategy extends ExclusionStrategy {
-  def shouldSkipField(f: FieldAttributes) : Boolean {
-    f.getName().toLowerCase().contains("additionalProperties");
-  }
-}
-
-//Normalize activities) -> posts(s)
-val GooglePlusTypeConverter = new GooglePlusTypeConverter()
-GooglePlusTypeConverter.prepare()
-val useractivity_posts = useractivity_buf.flatMap(x => 
GooglePlusTypeConverter.process(x))
-
-

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0d0068c4/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala
----------------------------------------------------------------------
diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala 
b/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala
deleted file mode 100644
index 3f224ff..0000000
--- a/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-%spark.dep
-z.reset()
-z.load("org.apache.streams:streams-core:0.4-incubating")
-z.load("org.apache.streams:streams-converters:0.4-incubating")
-z.load("org.apache.streams:streams-pojo:0.4-incubating")
-z.load("org.apache.streams:streams-provider-twitter:0.4-incubating")
-z.load("org.apache.streams:streams-provider-facebook:0.4-incubating")
-z.load("org.apache.streams:streams-provider-youtube:0.4-incubating")
-z.load("org.apache.streams:google-gplus:0.4-incubating")

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0d0068c4/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala
----------------------------------------------------------------------
diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala 
b/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala
deleted file mode 100644
index 0911102..0000000
--- a/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-%spark.dep
-z.reset()
-z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots";).snapshot()
-z.load("org.apache.streams:streams-provider-twitter:0.4-incubating-SNAPSHOT")
-
-%spark
-import com.typesafe.config._
-import org.apache.streams.config._
-import org.apache.streams.core._
-import java.util.Iterator
-import org.apache.streams.twitter.TwitterUserInformationConfiguration
-import org.apache.streams.twitter.pojo._
-import org.apache.streams.twitter.provider._
-
-%spark
-val consumerKey = z.input("ConsumerKey", "jF3awfLECUZ4tFAwS5bZcha8c")
-val consumerSecret = z.input("ConsumerSecret", 
"0IjoS5aPE88kNSREK6HNzAhUcJMziSlaT1fOkA5pzpusZLrhCj")
-val accessToken = z.input("AccessToken", 
"42232950-CzaYlt2M6SPGI883B5NZ8vROcL4qUsTJlp9wIM2K2")
-val accessTokenSecret = z.input("AccessTokenSecret", 
"vviQzladFUl23hdVelEiIknSLoHfAs40DqTv3RdXHhmz0")
-
-%spark
-val credentials_hocon = s"""
-    twitter {
-      oauth {
-       consumerKey = "$consumerKey"
-    consumerSecret = "$consumerSecret"
-    accessToken = "$accessToken"
-    accessTokenSecret = "$accessTokenSecret"
-      }
-      retrySleepMs = 5000
-  retryMax = 250
-    }
-"""
-
-%spark
-val accounts_hocon = s"""
-twitter.info = [
-#    "ApacheSpark"
-    1551361069
-#    "ApacheFlink"
-    2493948216
-#    "ApacheKafka"
-    1287555762
-#   "Hadoop"
-    175671289
-#   "ApacheCassandra"
-    19041500
-#   "ApacheSolr"
-    22742048
-#   "ApacheMahout"
-    1123330975
-#   "ApacheHive"
-    1188044936
-#   "ApacheHbase"
-    2912997159
-]
-"""
-
-%spark
-val reference = ConfigFactory.load()
-val credentials = ConfigFactory.parseString(credentials_hocon)
-val accounts = ConfigFactory.parseString(accounts_hocon)
-val typesafe = 
accounts.withFallback(credentials).withFallback(reference).resolve()
-val twitterUserInformationConfiguration = new 
ComponentConfigurator(classOf[TwitterUserInformationConfiguration]).detectConfiguration(typesafe,
 "twitter");
-
-%spark
-val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
-
-val twitterUserInformationProvider = new 
TwitterUserInformationProvider(twitterUserInformationConfiguration);
-twitterUserInformationProvider.prepare()
-twitterUserInformationProvider.startStream()
-while(twitterUserInformationProvider.isRunning()) {
-  val resultSet = twitterUserInformationProvider.readCurrent()
-  resultSet.size()
-  val iterator = resultSet.iterator();
-  while(iterator.hasNext()) {
-    val datum = iterator.next();
-    //println(datum.getDocument)
-    userdata_buf += datum.getDocument
-  }
-}
-userdata_buf.size
-
-%spark
-import com.typesafe.config._
-import org.apache.streams.config._
-import org.apache.streams.core._
-import java.util.Iterator
-import org.apache.streams.twitter.TwitterUserInformationConfiguration
-
-import org.apache.streams.twitter.pojo._
-import org.apache.streams.twitter.provider._
-
-val timeline_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
-
-val twitterTimelineProvider = new 
TwitterTimelineProvider(twitterUserInformationConfiguration);
-twitterTimelineProvider.prepare(twitterUserInformationConfiguration)
-twitterTimelineProvider.startStream()
-while(twitterTimelineProvider.isRunning()) {
-  val resultSet = twitterTimelineProvider.readCurrent()
-  resultSet.size()
-  val iterator = resultSet.iterator();
-  while(iterator.hasNext()) {
-    val datum = iterator.next();
-    //println(datum.getDocument)
-    timeline_buf += datum.getDocument
-  }
-}
-timeline_buf.size
-
-%spark
-import org.apache.streams.converter.ActivityObjectConverterProcessor
-import org.apache.streams.core.StreamsProcessor
-import org.apache.streams.pojo.json.ActivityObject
-import scala.collection.JavaConverters
-import scala.collection.JavaConversions._
-
-val converter = new ActivityObjectConverterProcessor()
-converter.prepare()
-
-val user_datums = userdata_buf.map(x => new StreamsDatum(x))
-val actor_datums = user_datums.flatMap(x => converter.process(x))
-val pages = actor_datums.map(x => x.getDocument.asInstanceOf[ActivityObject])
-
-%spark
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import sqlContext._
-import sqlContext.implicits._
-
-val mapper = StreamsJacksonMapper.getInstance();
-val pages_jsons = pages.map(o => mapper.writeValueAsString(o))
-val pagesRDD = sc.parallelize(pages_jsons)
-
-val pagesDF = sqlContext.read.json(pagesRDD)
-
-val pagescleanDF = pagesDF.withColumn("summary", 
removePunctuationAndSpecialChar(pagesDF("summary")))
-pagescleanDF.registerTempTable("twitter_pages")
-pagescleanDF.printSchema
-
-%spark
-import org.apache.streams.converter.ActivityConverterProcessor
-import org.apache.streams.core.StreamsProcessor
-import org.apache.streams.pojo.json.Activity
-import scala.collection.JavaConverters
-import scala.collection.JavaConversions._
-
-val converter = new ActivityConverterProcessor()
-converter.prepare()
-
-val status_datums = timeline_buf.map(x => new StreamsDatum(x))
-val activity_datums = status_datums.flatMap(x => converter.process(x)).map(x 
=> x.getDocument.asInstanceOf[Activity])
-activity_datums.size
-
-%spark
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import sqlContext._
-import sqlContext.implicits._
-
-val mapper = StreamsJacksonMapper.getInstance();
-val jsons = activity_datums.map(o => mapper.writeValueAsString(o))
-val activitiesRDD = sc.parallelize(jsons)
-
-val activitiesDF = sqlContext.read.json(activitiesRDD)
-
-val cleanDF = activitiesDF.withColumn("content", 
removePunctuationAndSpecialChar(activitiesDF("content")))
-cleanDF.registerTempTable("twitter_posts")
-cleanDF.printSchema
-
-%spark.sql
-select id, displayName, handle, summary, extensions.favorites, 
extensions.followers, extensions.posts from twitter_pages
-
-%spark.sql
-select id, actor.id, content, published, rebroadcasts.count from twitter_posts

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0d0068c4/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala
----------------------------------------------------------------------
diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala 
b/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala
deleted file mode 100644
index e988752..0000000
--- a/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-%spark.dep
-z.reset()
-z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots";).snapshot()
-z.load("org.apache.streams:streams-provider-youtube:0.4-incubating-SNAPSHOT")
-
-%spark
-import com.typesafe.config._
-import org.apache.streams.config._
-import org.apache.streams.core._
-import com.youtube.provider._
-import org.apache.youtube.pojo._
-import java.util.Iterator
-
-%spark
-val credentials =
-  """
-  |youtube {
-  |  apiKey = 79d9f9ca2796d1ec5334faf8d6efaa6456a297e6
-  |  oauth {
-  |    serviceAccountEmailAddress = 
"streams...@adroit-particle-764.iam.gserviceaccount.com"
-  |    pathToP12KeyFile = streams-c84fa47bd759.p12
-  |  }
-  |}
-  |"""
-val credentialsConfig = ConfigFactory.parseString(credentials)
-
-%spark
-val accounts =
-  """
-    |youtube {
-    |  youtubeUsers = [
-    |    {
-    |      userId = "UCLDJ_V9KUOdOFSbDvPfGBxw"
-    |    }
-    |  ]
-    |}
-    |"""
-val accountsConfig = ConfigFactory.parseString(accounts)
-
-%spark
-val reference = ConfigFactory.load()
-val typesafe = 
accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve()
-val config = new 
ComponentConfigurator(classOf[YoutubeConfiguration]).detectConfiguration(typesafe,
 "youtube");
-
-%spark
-// Pull info on those channels
-val YoutubeChannelProvider = new YoutubeChannelProvider(config);
-YoutubeChannelProvider.prepare(null)
-YoutubeChannelProvider.startStream()
-//
-val channel_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
-while(YoutubeChannelProvider.isRunning()) {
-  val resultSet = YoutubeChannelProvider.readCurrent()
-  resultSet.size()
-  val iterator = resultSet.iterator();
-  while(iterator.hasNext()) {
-    val datum = iterator.next();
-    channel_buf += datum.getDocument
-  }
-}
-
-%spark
-//Pull activity from those accounts
-val YoutubeUserActivityProvider = new YoutubeUserActivityProvider(config);
-YoutubeUserActivityProvider.prepare(null)
-YoutubeUserActivityProvider.startStream()
-while(YoutubeUserActivityProvider.isRunning())
-//
-val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
-while(YoutubeUserActivityProvider.isRunning()) {
-  val resultSet = YoutubeUserActivityProvider.readCurrent()
-  resultSet.size()
-  val iterator = resultSet.iterator();
-  while(iterator.hasNext()) {
-    val datum = iterator.next();
-    useractivity_buf += datum.getDocument
-  }
-}
-
-%spark
-import org.apache.streams.core.StreamsDatum
-import com.youtube.processor._
-import scala.collection.JavaConversions._
-//Normalize activities -> posts(s)
-val YoutubeTypeConverter = new YoutubeTypeConverter()
-YoutubeTypeConverter.prepare()
-val useractivity_posts = useractivity_buf.flatMap(x => 
YoutubeTypeConverter.process(x))
-
-%spark
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
-val mapper = StreamsJacksonMapper.getInstance();
-val activitiesRDD = sc.parallelize(useractivity_posts.map(o => 
mapper.writeValueAsString(o)))
-
-val activitiesDF = sqlContext.read.json(activitiesRDD)
-
-activitiesDF.registerTempTable("activities")
-
-%spark.sql
-select count(id) from activitiesDF

Reply via email to