Repository: incubator-streams-examples Updated Branches: refs/heads/master 3eb2e3526 -> 1fb1e0e11
switch elasticsearch-hdfs to testng Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/47f159e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/47f159e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/47f159e1 Branch: refs/heads/master Commit: 47f159e1e3ba01ce24fbb80a20223be2f1c1efd5 Parents: 3eb2e35 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Authored: Thu Dec 15 12:20:51 2016 -0600 Committer: Steve Blackmon @steveblackmon <sblack...@apache.org> Committed: Thu Dec 15 12:20:51 2016 -0600 ---------------------------------------------------------------------- local/elasticsearch-hdfs/pom.xml | 77 +++++---- .../streams/example/ElasticsearchHdfs.java | 11 +- .../streams/example/HdfsElasticsearch.java | 13 +- .../example/test/ElasticsearchHdfsIT.java | 25 +-- .../apache/streams/example/test/ExampleITs.java | 35 ---- .../example/test/HdfsElasticsearchIT.java | 26 +-- .../src/test/resources/testng.xml | 24 +++ .../src/main/scala/facebook.scala | 143 +++++++++++++++ .../src/main/scala/gplus.scala | 68 ++++++++ .../src/main/scala/setup.scala | 9 + .../src/main/scala/twitter.scala | 173 +++++++++++++++++++ .../src/main/scala/youtube.scala | 102 +++++++++++ 12 files changed, 603 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/pom.xml b/local/elasticsearch-hdfs/pom.xml index 5830d34..1eabd45 100644 --- a/local/elasticsearch-hdfs/pom.xml +++ b/local/elasticsearch-hdfs/pom.xml @@ -39,31 +39,37 @@ </properties> <dependencies> - <!-- Test includes --> - <dependency> - <groupId>org.apache.lucene</groupId> - <artifactId>lucene-test-framework</artifactId> - <version>${lucene.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.lucene</groupId> - <artifactId>lucene-codecs</artifactId> - <version>${lucene.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.elasticsearch</groupId> - <artifactId>elasticsearch</artifactId> - <version>${elasticsearch.version}</version> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <version>1.3</version> - <scope>test</scope> - </dependency> + <!-- Test includes --> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-test-framework</artifactId> + <version>${lucene.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-codecs</artifactId> + <version>${lucene.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <version>${testng.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-core</artifactId> @@ -284,19 +290,22 @@ <artifactId>maven-failsafe-plugin</artifactId> <configuration> <!-- Run integration test suite rather than individual tests. --> - <excludes> - <exclude>**/*Test.java</exclude> - <exclude>**/*Tests.java</exclude> - <exclude>**/*IT.java</exclude> - </excludes> - <includes> - <include>**/*ITs.java</include> - </includes> + <suiteXmlFiles> + <suiteXmlFile>target/test-classes/testng.xml</suiteXmlFile> + </suiteXmlFiles> + <!--<excludes>--> + <!--<exclude>**/*Test.java</exclude>--> + <!--<exclude>**/*Tests.java</exclude>--> + <!--</excludes>--> + <!--<includes>--> + <!--<exclude>**/*IT.java</exclude>--> + <!--<include>**/*ITs.java</include>--> + <!--</includes>--> </configuration> <dependencies> <dependency> <groupId>org.apache.maven.surefire</groupId> - <artifactId>surefire-junit47</artifactId> + <artifactId>surefire-testng</artifactId> <version>${failsafe.plugin.version}</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java index 8d3cf36..be79f4a 100644 --- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java +++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java @@ -18,14 +18,15 @@ package org.apache.streams.example; -import com.google.common.collect.Maps; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamBuilder; import org.apache.streams.elasticsearch.ElasticsearchPersistReader; -import org.apache.streams.example.ElasticsearchHdfsConfiguration; import org.apache.streams.hdfs.WebHdfsPersistWriter; -import org.apache.streams.core.StreamBuilder; import org.apache.streams.local.builders.LocalStreamBuilder; + +import com.google.common.collect.Maps; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,8 +74,8 @@ public class ElasticsearchHdfs implements Runnable { streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000); StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig); - builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, elasticsearchPersistReader); - builder.addStreamsPersistWriter(WebHdfsPersistWriter.STREAMS_ID, hdfsPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID); + builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader); + builder.addStreamsPersistWriter(WebHdfsPersistWriter.class.getCanonicalName(), hdfsPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName()); builder.start(); } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java index 847ac48..375665c 100644 --- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java +++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java @@ -18,14 +18,15 @@ package org.apache.streams.example; -import com.google.common.collect.Maps; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.example.HdfsElasticsearchConfiguration; -import org.apache.streams.hdfs.WebHdfsPersistReader; -import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; import org.apache.streams.core.StreamBuilder; +import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; +import org.apache.streams.hdfs.WebHdfsPersistReader; import org.apache.streams.local.builders.LocalStreamBuilder; + +import com.google.common.collect.Maps; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,8 +74,8 @@ public class HdfsElasticsearch implements Runnable { streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000); StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig); - builder.newPerpetualStream(WebHdfsPersistReader.STREAMS_ID, webHdfsPersistReader); - builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, elasticsearchPersistWriter, 1, WebHdfsPersistReader.STREAMS_ID); + builder.newPerpetualStream(WebHdfsPersistReader.class.getCanonicalName(), webHdfsPersistReader); + builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, WebHdfsPersistReader.class.getCanonicalName()); builder.start(); } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java index 86d932b..437ebf6 100644 --- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java +++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java @@ -18,15 +18,17 @@ package org.apache.streams.example.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.example.ElasticsearchHdfs; import org.apache.streams.example.ElasticsearchHdfsConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; @@ -36,15 +38,16 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.junit.Before; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; import java.io.File; -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertNotEquals; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.testng.Assert.assertNotEquals; /** * Test copying documents between hdfs and elasticsearch @@ -60,7 +63,7 @@ public class ElasticsearchHdfsIT { private int count = 0; - @Before + @BeforeClass public void prepareTest() throws Exception { Config reference = ConfigFactory.load(); @@ -69,7 +72,7 @@ public class ElasticsearchHdfsIT { Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); Config typesafe = testResourceConfig.withFallback(reference).resolve(); testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe); - testClient = new ElasticsearchClientManager(testConfiguration.getSource()).getClient(); + testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client(); ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); @@ -77,7 +80,7 @@ public class ElasticsearchHdfsIT { IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0)); IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet(); - assertTrue(indicesExistsResponse.isExists()); + assertThat(indicesExistsResponse.isExists(), is(true)); SearchRequestBuilder countRequest = testClient .prepareSearch(testConfiguration.getSource().getIndexes().get(0)) http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java deleted file mode 100644 index a9b7ecf..0000000 --- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.example.test; - -import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) -@Suite.SuiteClasses({ - ElasticsearchPersistWriterIT.class, - ElasticsearchHdfsIT.class, - HdfsElasticsearchIT.class, -}) - -public class ExampleITs { - // the class remains empty, - // used only as a holder for the above annotations -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java index 35a32e7..a629025 100644 --- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java +++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java @@ -18,15 +18,17 @@ package org.apache.streams.example.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.example.HdfsElasticsearch; import org.apache.streams.example.HdfsElasticsearchConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -38,16 +40,16 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.junit.Before; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; import java.io.File; -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.AssertJUnit.assertTrue; /** * Test copying documents between hdfs and elasticsearch @@ -61,7 +63,7 @@ public class HdfsElasticsearchIT { protected HdfsElasticsearchConfiguration testConfiguration; protected Client testClient; - @Before + @BeforeClass public void prepareTest() throws Exception { Config reference = ConfigFactory.load(); @@ -70,7 +72,7 @@ public class HdfsElasticsearchIT { Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); Config typesafe = testResourceConfig.withFallback(reference).resolve(); testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe); - testClient = new ElasticsearchClientManager(testConfiguration.getDestination()).getClient(); + testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client(); ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest(); ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet(); @@ -101,7 +103,7 @@ public class HdfsElasticsearchIT { .setTypes(testConfiguration.getDestination().getType()); SearchResponse countResponse = countRequest.execute().actionGet(); - assertEquals(89, countResponse.getHits().getTotalHits()); + assertEquals(countResponse.getHits().getTotalHits(), 89); } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/test/resources/testng.xml ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/src/test/resources/testng.xml b/local/elasticsearch-hdfs/src/test/resources/testng.xml new file mode 100644 index 0000000..0110e1d --- /dev/null +++ b/local/elasticsearch-hdfs/src/test/resources/testng.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" > + +<suite name="ExampleITs"> + + <test name="ElasticsearchPersistWriterIT"> + <classes> + <class name="org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT" /> + </classes> + </test> + + <test name="ElasticsearchHdfsIT"> + <classes> + <class name="org.apache.streams.example.test.ElasticsearchHdfsIT" /> + </classes> + </test> + + <test name="HdfsElasticsearchIT"> + <classes> + <class name="org.apache.streams.example.test.HdfsElasticsearchIT" /> + </classes> + </test> + +</suite> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala ---------------------------------------------------------------------- diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala b/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala new file mode 100644 index 0000000..aae12ec --- /dev/null +++ b/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala @@ -0,0 +1,143 @@ +%spark.dep +z.reset() +z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots").snapshot() +z.load("org.apache.streams:streams-provider-facebook:0.4-incubating-SNAPSHOT") + +%spark +import com.typesafe.config._ +import org.apache.streams.config._ +import org.apache.streams.core._ +import org.apache.streams.facebook._ +import org.apache.streams.facebook.graph._ +import java.util.Iterator + +%spark +val credentials = + """ + |facebook { + | oauth { + | appId = "299258633581961" + | appSecret = 03b887d68ee4a3117f9f087330fe8c8f + | } + | userAccessTokens = [ + |EAACEdEose0cBAG4nq7ZB36wwCGv14UToDpZCwXgZA1ZCuShBp1tPQozsbxU75RaOEiJKx75sQgox6wCNgx6rCrEL5K96oNE9EoGutFPBPAEWBZAo7xlgfx715HhAdqdmoaaFTbwJWwruehr1FwIXJr2OAfsxFrqYbPYUkXXojAtSgoEm9WrhW6RRa7os6xBIZD + | ] + |} + |""" +val credentialsConfig = ConfigFactory.parseString(credentials) + +%spark +val accounts = + """ + |facebook { + | ids = [ + | { + | #"id": "Apache-Software-Foundation" + | "id": "108021202551732" + | }, + | { + | #"id": "Apache-Spark" + | "id": "695067547183193" + | }, + | { + | # Apache-Cordova + | "id": "144287225588642" + | }, + | { + | # Apache-HTTP-Server + | "id": "107703115926025" + | }, + | { + | # Apache-Cassandra + | "id": "136080266420061" + | }, + | { + | # Apache-Solr + | "id": "333596995194" + | }, + | { + | # Apache-CXF + | "id": "509899489117171" + | }, + | { + | # Apache-Kafka + | "id": "109576742394607" + | }, + | { + | # Apache-Groovy + | "id": "112510602100049" + | }, + | { + | # Apache-Hadoop + | "id": "102175453157656" + | }, + | { + | # Apache-Hive + | "id": "192818954063511" + | }, + | { + | # Apache-Mahout + | "id": "109528065733066" + | }, + | { + | # Apache-HBase + | "id": "103760282995363" + | } + | ] + |} + |""" +val accountsConfig = ConfigFactory.parseString(accounts) + +%spark +val reference = ConfigFactory.load() +val typesafe = accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve() +val config = new ComponentConfigurator(classOf[FacebookUserInformationConfiguration]).detectConfiguration(typesafe, "facebook"); + +%spark +// Pull info on those accounts +val FacebookPageProvider = new FacebookPageProvider(config); +FacebookPageProvider.prepare(null) +FacebookPageProvider.startStream() +// +val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object] +while(FacebookPageProvider.isRunning()) { + val resultSet = FacebookPageProvider.readCurrent() + resultSet.size() + val iterator = resultSet.iterator(); + while(iterator.hasNext()) { + val datum = iterator.next(); + userdata_buf += datum.getDocument + } +} + +%spark +//Pull activity from those accounts +val FacebookPageFeedProvider = new FacebookPageFeedProvider(config); +FacebookPageFeedProvider.prepare(null) +FacebookPageFeedProvider.startStream() +while(FacebookPageFeedProvider.isRunning()) +// +val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object] +while(FacebookPageFeedProvider.isRunning()) { + val resultSet = FacebookPageFeedProvider.readCurrent() + resultSet.size() + val iterator = resultSet.iterator(); + while(iterator.hasNext()) { + val datum = iterator.next(); + useractivity_buf += datum.getDocument + } +} + +%spark +//Normalize person(s) -> page(s) +val FacebookTypeConverter = new FacebookTypeConverter(classOf[Page], classOf[Page]) +FacebookTypeConverter.prepare() +val userdata_pages = userdata_buf.flatMap(x => FacebookTypeConverter.process(x)) + +%spark +//Normalize activities) -> posts(s) +val FacebookTypeConverter = new FacebookTypeConverter(classOf[Post], classOf[Post]) +FacebookTypeConverter.prepare() +val useractivity_posts = useractivity_buf.flatMap(x => FacebookTypeConverter.process(x)) + + http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala ---------------------------------------------------------------------- diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala b/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala new file mode 100644 index 0000000..9734ded --- /dev/null +++ b/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala @@ -0,0 +1,68 @@ + + + + +%spark +val reference = ConfigFactory.load() +val typesafe = accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve() +val config = new ComponentConfigurator(classOf[GPlusConfiguration]).detectConfiguration(typesafe, "gplus"); + +%spark +// Pull info on those accounts +val GPlusUserDataProvider = new GPlusUserDataProvider(config); +GPlusUserDataProvider.prepare(null) +GPlusUserDataProvider.startStream() +// +val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object] +while(GPlusUserDataProvider.isRunning()) { + val resultSet = GPlusUserDataProvider.readCurrent() + resultSet.size() + val iterator = resultSet.iterator(); + while(iterator.hasNext()) { + val datum = iterator.next(); + userdata_buf += datum.getDocument + } +} + +%spark +//Pull activity from those accounts +val GPlusUserActivityProvider = new GPlusUserActivityProvider(config); +GPlusUserActivityProvider.prepare(null) +GPlusUserActivityProvider.startStream() +while(GPlusUserActivityProvider.isRunning()) +// +val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object] +while(GPlusUserActivityProvider.isRunning()) { + val resultSet = GPlusUserActivityProvider.readCurrent() + resultSet.size() + val iterator = resultSet.iterator(); + while(iterator.hasNext()) { + val datum = iterator.next(); + useractivity_buf += datum.getDocument + } +} + +%spark +//Normalize person(s) -> page(s) +val GooglePlusTypeConverter = new GooglePlusTypeConverter() +GooglePlusTypeConverter.prepare() +val userdata_pages = userdata_buf.flatMap(x => GooglePlusTypeConverter.process(x)) + + +%spark +import com.google.gson.ExclusionStrategy +import com.google.gson.FieldAttributes +import com.sun.javafx.runtime.async.AbstractAsyncOperation +import sun.jvm.hotspot.runtime.NativeSignatureIterator +class MyExclusionStrategy extends ExclusionStrategy { + def shouldSkipField(f: FieldAttributes) : Boolean { + f.getName().toLowerCase().contains("additionalProperties"); + } +} + +//Normalize activities) -> posts(s) +val GooglePlusTypeConverter = new GooglePlusTypeConverter() +GooglePlusTypeConverter.prepare() +val useractivity_posts = useractivity_buf.flatMap(x => GooglePlusTypeConverter.process(x)) + + http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala ---------------------------------------------------------------------- diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala b/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala new file mode 100644 index 0000000..7c67f75 --- /dev/null +++ b/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala @@ -0,0 +1,9 @@ +%spark.dep +z.reset() +z.load("org.apache.streams:streams-core:0.4-incubating") +z.load("org.apache.streams:streams-converters:0.4-incubating") +z.load("org.apache.streams:streams-pojo:0.4-incubating") +z.load("org.apache.streams:streams-provider-twitter:0.4-incubating") +z.load("org.apache.streams:streams-provider-facebook:0.4-incubating") +z.load("org.apache.streams:streams-provider-youtube:0.4-incubating") +z.load("org.apache.streams:google-gplus:0.4-incubating") http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala ---------------------------------------------------------------------- diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala b/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala new file mode 100644 index 0000000..86c83d9 --- /dev/null +++ b/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala @@ -0,0 +1,173 @@ +%spark.dep +z.reset() +z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots").snapshot() +z.load("org.apache.streams:streams-provider-twitter:0.4-incubating-SNAPSHOT") + +%spark +import com.typesafe.config._ +import org.apache.streams.config._ +import org.apache.streams.core._ +import java.util.Iterator +import org.apache.streams.twitter.TwitterUserInformationConfiguration +import org.apache.streams.twitter.pojo._ +import org.apache.streams.twitter.provider._ + +%spark +val consumerKey = z.input("ConsumerKey", "jF3awfLECUZ4tFAwS5bZcha8c") +val consumerSecret = z.input("ConsumerSecret", "0IjoS5aPE88kNSREK6HNzAhUcJMziSlaT1fOkA5pzpusZLrhCj") +val accessToken = z.input("AccessToken", "42232950-CzaYlt2M6SPGI883B5NZ8vROcL4qUsTJlp9wIM2K2") +val accessTokenSecret = z.input("AccessTokenSecret", "vviQzladFUl23hdVelEiIknSLoHfAs40DqTv3RdXHhmz0") + +%spark +val credentials_hocon = s""" + twitter { + oauth { + consumerKey = "$consumerKey" + consumerSecret = "$consumerSecret" + accessToken = "$accessToken" + accessTokenSecret = "$accessTokenSecret" + } + retrySleepMs = 5000 + retryMax = 250 + } +""" + +%spark +val accounts_hocon = s""" +twitter.info = [ +# "ApacheSpark" + 1551361069 +# "ApacheFlink" + 2493948216 +# "ApacheKafka" + 1287555762 +# "Hadoop" + 175671289 +# "ApacheCassandra" + 19041500 +# "ApacheSolr" + 22742048 +# "ApacheMahout" + 1123330975 +# "ApacheHive" + 1188044936 +# "ApacheHbase" + 2912997159 +] +""" + +%spark +val reference = ConfigFactory.load() +val credentials = ConfigFactory.parseString(credentials_hocon) +val accounts = ConfigFactory.parseString(accounts_hocon) +val typesafe = accounts.withFallback(credentials).withFallback(reference).resolve() +val twitterUserInformationConfiguration = new ComponentConfigurator(classOf[TwitterUserInformationConfiguration]).detectConfiguration(typesafe, "twitter"); + +%spark +val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object] + +val twitterUserInformationProvider = new TwitterUserInformationProvider(twitterUserInformationConfiguration); +twitterUserInformationProvider.prepare() +twitterUserInformationProvider.startStream() +while(twitterUserInformationProvider.isRunning()) { + val resultSet = twitterUserInformationProvider.readCurrent() + resultSet.size() + val iterator = resultSet.iterator(); + while(iterator.hasNext()) { + val datum = iterator.next(); + //println(datum.getDocument) + userdata_buf += datum.getDocument + } +} +userdata_buf.size + +%spark +import com.typesafe.config._ +import org.apache.streams.config._ +import org.apache.streams.core._ +import java.util.Iterator +import org.apache.streams.twitter.TwitterUserInformationConfiguration + +import org.apache.streams.twitter.pojo._ +import org.apache.streams.twitter.provider._ + +val timeline_buf = scala.collection.mutable.ArrayBuffer.empty[Object] + +val twitterTimelineProvider = new TwitterTimelineProvider(twitterUserInformationConfiguration); +twitterTimelineProvider.prepare(twitterUserInformationConfiguration) +twitterTimelineProvider.startStream() +while(twitterTimelineProvider.isRunning()) { + val resultSet = twitterTimelineProvider.readCurrent() + resultSet.size() + val iterator = resultSet.iterator(); + while(iterator.hasNext()) { + val datum = iterator.next(); + //println(datum.getDocument) + timeline_buf += datum.getDocument + } +} +timeline_buf.size + +%spark +import org.apache.streams.converter.ActivityObjectConverterProcessor +import org.apache.streams.core.StreamsProcessor +import org.apache.streams.pojo.json.ActivityObject +import scala.collection.JavaConverters +import scala.collection.JavaConversions._ + +val converter = new ActivityObjectConverterProcessor() +converter.prepare() + +val user_datums = userdata_buf.map(x => new StreamsDatum(x)) +val actor_datums = user_datums.flatMap(x => converter.process(x)) +val pages = actor_datums.map(x => x.getDocument.asInstanceOf[ActivityObject]) + +%spark +import org.apache.streams.jackson.StreamsJacksonMapper; +import sqlContext._ +import sqlContext.implicits._ + +val mapper = StreamsJacksonMapper.getInstance(); +val pages_jsons = pages.map(o => mapper.writeValueAsString(o)) +val pagesRDD = sc.parallelize(pages_jsons) + +val pagesDF = sqlContext.read.json(pagesRDD) + +val pagescleanDF = pagesDF.withColumn("summary", removePunctuationAndSpecialChar(pagesDF("summary"))) +pagescleanDF.registerTempTable("twitter_pages") +pagescleanDF.printSchema + +%spark +import org.apache.streams.converter.ActivityConverterProcessor +import org.apache.streams.core.StreamsProcessor +import org.apache.streams.pojo.json.Activity +import scala.collection.JavaConverters +import scala.collection.JavaConversions._ + +val converter = new ActivityConverterProcessor() +converter.prepare() + +val status_datums = timeline_buf.map(x => new StreamsDatum(x)) +val activity_datums = status_datums.flatMap(x => converter.process(x)).map(x => x.getDocument.asInstanceOf[Activity]) +activity_datums.size + +%spark +import org.apache.streams.jackson.StreamsJacksonMapper; +import sqlContext._ +import sqlContext.implicits._ + +val mapper = StreamsJacksonMapper.getInstance(); +val jsons = activity_datums.map(o => mapper.writeValueAsString(o)) +val activitiesRDD = sc.parallelize(jsons) + +val activitiesDF = sqlContext.read.json(activitiesRDD) + +val cleanDF = activitiesDF.withColumn("content", removePunctuationAndSpecialChar(activitiesDF("content"))) +cleanDF.registerTempTable("twitter_posts") +cleanDF.printSchema + +%spark.sql +select id, displayName, handle, summary, extensions.favorites, extensions.followers, extensions.posts from twitter_pages + +%spark.sql +select id, actor.id, content, published, rebroadcasts.count from twitter_posts http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala ---------------------------------------------------------------------- diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala b/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala new file mode 100644 index 0000000..2743952 --- /dev/null +++ b/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala @@ -0,0 +1,102 @@ +%spark.dep +z.reset() +z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots").snapshot() +z.load("org.apache.streams:streams-provider-youtube:0.4-incubating-SNAPSHOT") + +%spark +import com.typesafe.config._ +import org.apache.streams.config._ +import org.apache.streams.core._ +import com.youtube.provider._ +import org.apache.youtube.pojo._ +import java.util.Iterator + +%spark +val credentials = + """ + |youtube { + | apiKey = 79d9f9ca2796d1ec5334faf8d6efaa6456a297e6 + | oauth { + | serviceAccountEmailAddress = "streams...@adroit-particle-764.iam.gserviceaccount.com" + | pathToP12KeyFile = streams-c84fa47bd759.p12 + | } + |} + |""" +val credentialsConfig = ConfigFactory.parseString(credentials) + +%spark +val accounts = + """ + |youtube { + | youtubeUsers = [ + | { + | userId = "UCLDJ_V9KUOdOFSbDvPfGBxw" + | } + | ] + |} + |""" +val accountsConfig = ConfigFactory.parseString(accounts) + +%spark +val reference = ConfigFactory.load() +val typesafe = accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve() +val config = new ComponentConfigurator(classOf[YoutubeConfiguration]).detectConfiguration(typesafe, "youtube"); + +%spark +// Pull info on those channels +val YoutubeChannelProvider = new YoutubeChannelProvider(config); +YoutubeChannelProvider.prepare(null) +YoutubeChannelProvider.startStream() +// +val channel_buf = scala.collection.mutable.ArrayBuffer.empty[Object] +while(YoutubeChannelProvider.isRunning()) { + val resultSet = YoutubeChannelProvider.readCurrent() + resultSet.size() + val iterator = resultSet.iterator(); + while(iterator.hasNext()) { + val datum = iterator.next(); + channel_buf += datum.getDocument + } +} + +%spark +//Pull activity from those accounts +val YoutubeUserActivityProvider = new YoutubeUserActivityProvider(config); +YoutubeUserActivityProvider.prepare(null) +YoutubeUserActivityProvider.startStream() +while(YoutubeUserActivityProvider.isRunning()) +// +val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object] +while(YoutubeUserActivityProvider.isRunning()) { + val resultSet = YoutubeUserActivityProvider.readCurrent() + resultSet.size() + val iterator = resultSet.iterator(); + while(iterator.hasNext()) { + val datum = iterator.next(); + useractivity_buf += datum.getDocument + } +} + +%spark +import org.apache.streams.core.StreamsDatum +import com.youtube.processor._ +import scala.collection.JavaConversions._ +//Normalize activities -> posts(s) +val YoutubeTypeConverter = new YoutubeTypeConverter() +YoutubeTypeConverter.prepare() +val useractivity_posts = useractivity_buf.flatMap(x => YoutubeTypeConverter.process(x)) + +%spark +import org.apache.streams.jackson.StreamsJacksonMapper; + +val sqlContext = new org.apache.spark.sql.SQLContext(sc) + +val mapper = StreamsJacksonMapper.getInstance(); +val activitiesRDD = sc.parallelize(useractivity_posts.map(o => mapper.writeValueAsString(o))) + +val activitiesDF = sqlContext.read.json(activitiesRDD) + +activitiesDF.registerTempTable("activities") + +%spark.sql +select count(id) from activitiesDF