Repository: incubator-streams-examples
Updated Branches:
  refs/heads/master 3eb2e3526 -> 1fb1e0e11


switch elasticsearch-hdfs to testng


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/47f159e1
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/47f159e1
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/47f159e1

Branch: refs/heads/master
Commit: 47f159e1e3ba01ce24fbb80a20223be2f1c1efd5
Parents: 3eb2e35
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Authored: Thu Dec 15 12:20:51 2016 -0600
Committer: Steve Blackmon @steveblackmon <sblack...@apache.org>
Committed: Thu Dec 15 12:20:51 2016 -0600

----------------------------------------------------------------------
 local/elasticsearch-hdfs/pom.xml                |  77 +++++----
 .../streams/example/ElasticsearchHdfs.java      |  11 +-
 .../streams/example/HdfsElasticsearch.java      |  13 +-
 .../example/test/ElasticsearchHdfsIT.java       |  25 +--
 .../apache/streams/example/test/ExampleITs.java |  35 ----
 .../example/test/HdfsElasticsearchIT.java       |  26 +--
 .../src/test/resources/testng.xml               |  24 +++
 .../src/main/scala/facebook.scala               | 143 +++++++++++++++
 .../src/main/scala/gplus.scala                  |  68 ++++++++
 .../src/main/scala/setup.scala                  |   9 +
 .../src/main/scala/twitter.scala                | 173 +++++++++++++++++++
 .../src/main/scala/youtube.scala                | 102 +++++++++++
 12 files changed, 603 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/pom.xml b/local/elasticsearch-hdfs/pom.xml
index 5830d34..1eabd45 100644
--- a/local/elasticsearch-hdfs/pom.xml
+++ b/local/elasticsearch-hdfs/pom.xml
@@ -39,31 +39,37 @@
     </properties>
 
     <dependencies>
-    <!-- Test includes -->
-    <dependency>
-        <groupId>org.apache.lucene</groupId>
-        <artifactId>lucene-test-framework</artifactId>
-        <version>${lucene.version}</version>
-        <scope>test</scope>
-    </dependency>
-    <dependency>
-        <groupId>org.apache.lucene</groupId>
-        <artifactId>lucene-codecs</artifactId>
-        <version>${lucene.version}</version>
-        <scope>test</scope>
-    </dependency>
-    <dependency>
-        <groupId>org.elasticsearch</groupId>
-        <artifactId>elasticsearch</artifactId>
-        <version>${elasticsearch.version}</version>
-        <type>test-jar</type>
-    </dependency>
-    <dependency>
-        <groupId>org.hamcrest</groupId>
-        <artifactId>hamcrest-all</artifactId>
-        <version>1.3</version>
-        <scope>test</scope>
-    </dependency>
+        <!-- Test includes -->
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-test-framework</artifactId>
+            <version>${lucene.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-codecs</artifactId>
+            <version>${lucene.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${elasticsearch.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>${testng.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
@@ -284,19 +290,22 @@
                 <artifactId>maven-failsafe-plugin</artifactId>
                 <configuration>
                     <!-- Run integration test suite rather than individual 
tests. -->
-                    <excludes>
-                        <exclude>**/*Test.java</exclude>
-                        <exclude>**/*Tests.java</exclude>
-                        <exclude>**/*IT.java</exclude>
-                    </excludes>
-                    <includes>
-                        <include>**/*ITs.java</include>
-                    </includes>
+                    <suiteXmlFiles>
+                        
<suiteXmlFile>target/test-classes/testng.xml</suiteXmlFile>
+                    </suiteXmlFiles>
+                    <!--<excludes>-->
+                        <!--<exclude>**/*Test.java</exclude>-->
+                        <!--<exclude>**/*Tests.java</exclude>-->
+                    <!--</excludes>-->
+                    <!--<includes>-->
+                        <!--<exclude>**/*IT.java</exclude>-->
+                        <!--<include>**/*ITs.java</include>-->
+                    <!--</includes>-->
                 </configuration>
                 <dependencies>
                     <dependency>
                         <groupId>org.apache.maven.surefire</groupId>
-                        <artifactId>surefire-junit47</artifactId>
+                        <artifactId>surefire-testng</artifactId>
                         <version>${failsafe.plugin.version}</version>
                     </dependency>
                 </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
----------------------------------------------------------------------
diff --git 
a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
 
b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
index 8d3cf36..be79f4a 100644
--- 
a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
+++ 
b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
@@ -18,14 +18,15 @@
 
 package org.apache.streams.example;
 
-import com.google.common.collect.Maps;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
-import org.apache.streams.example.ElasticsearchHdfsConfiguration;
 import org.apache.streams.hdfs.WebHdfsPersistWriter;
-import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.local.builders.LocalStreamBuilder;
+
+import com.google.common.collect.Maps;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,8 +74,8 @@ public class ElasticsearchHdfs implements Runnable {
         streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
         StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
 
-        builder.newPerpetualStream(ElasticsearchPersistReader.STREAMS_ID, 
elasticsearchPersistReader);
-        builder.addStreamsPersistWriter(WebHdfsPersistWriter.STREAMS_ID, 
hdfsPersistWriter, 1, ElasticsearchPersistReader.STREAMS_ID);
+        
builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), 
elasticsearchPersistReader);
+        
builder.addStreamsPersistWriter(WebHdfsPersistWriter.class.getCanonicalName(), 
hdfsPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
         builder.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
----------------------------------------------------------------------
diff --git 
a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
 
b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
index 847ac48..375665c 100644
--- 
a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
+++ 
b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
@@ -18,14 +18,15 @@
 
 package org.apache.streams.example;
 
-import com.google.common.collect.Maps;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.example.HdfsElasticsearchConfiguration;
-import org.apache.streams.hdfs.WebHdfsPersistReader;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
 import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.hdfs.WebHdfsPersistReader;
 import org.apache.streams.local.builders.LocalStreamBuilder;
+
+import com.google.common.collect.Maps;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,8 +74,8 @@ public class HdfsElasticsearch implements Runnable {
         streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
         StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
 
-        builder.newPerpetualStream(WebHdfsPersistReader.STREAMS_ID, 
webHdfsPersistReader);
-        builder.addStreamsPersistWriter(ElasticsearchPersistWriter.STREAMS_ID, 
elasticsearchPersistWriter, 1, WebHdfsPersistReader.STREAMS_ID);
+        
builder.newPerpetualStream(WebHdfsPersistReader.class.getCanonicalName(), 
webHdfsPersistReader);
+        
builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(),
 elasticsearchPersistWriter, 1, WebHdfsPersistReader.class.getCanonicalName());
         builder.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
----------------------------------------------------------------------
diff --git 
a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
 
b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
index 86d932b..437ebf6 100644
--- 
a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
+++ 
b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
@@ -18,15 +18,17 @@
 
 package org.apache.streams.example.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.example.ElasticsearchHdfs;
 import org.apache.streams.example.ElasticsearchHdfsConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -36,15 +38,16 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
 import java.io.File;
 
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertNotEquals;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.testng.Assert.assertNotEquals;
 
 /**
  * Test copying documents between hdfs and elasticsearch
@@ -60,7 +63,7 @@ public class ElasticsearchHdfsIT {
 
     private int count = 0;
 
-    @Before
+    @BeforeClass
     public void prepareTest() throws Exception {
 
         Config reference  = ConfigFactory.load();
@@ -69,7 +72,7 @@ public class ElasticsearchHdfsIT {
         Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
         Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
         testConfiguration = new 
ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
-        testClient = new 
ElasticsearchClientManager(testConfiguration.getSource()).getClient();
+        testClient = 
ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
 
         ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
         ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
@@ -77,7 +80,7 @@ public class ElasticsearchHdfsIT {
 
         IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
         IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
+        assertThat(indicesExistsResponse.isExists(), is(true));
 
         SearchRequestBuilder countRequest = testClient
                 
.prepareSearch(testConfiguration.getSource().getIndexes().get(0))

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java
----------------------------------------------------------------------
diff --git 
a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java
 
b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java
deleted file mode 100644
index a9b7ecf..0000000
--- 
a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ExampleITs.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.example.test;
-
-import org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
-        ElasticsearchPersistWriterIT.class,
-        ElasticsearchHdfsIT.class,
-        HdfsElasticsearchIT.class,
-})
-
-public class ExampleITs {
-    // the class remains empty,
-    // used only as a holder for the above annotations
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
----------------------------------------------------------------------
diff --git 
a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
 
b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
index 35a32e7..a629025 100644
--- 
a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
+++ 
b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
@@ -18,15 +18,17 @@
 
 package org.apache.streams.example.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.example.HdfsElasticsearch;
 import org.apache.streams.example.HdfsElasticsearchConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -38,16 +40,16 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.junit.Before;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
 import java.io.File;
 
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.AssertJUnit.assertTrue;
 
 /**
  * Test copying documents between hdfs and elasticsearch
@@ -61,7 +63,7 @@ public class HdfsElasticsearchIT {
     protected HdfsElasticsearchConfiguration testConfiguration;
     protected Client testClient;
 
-    @Before
+    @BeforeClass
     public void prepareTest() throws Exception {
 
         Config reference  = ConfigFactory.load();
@@ -70,7 +72,7 @@ public class HdfsElasticsearchIT {
         Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
         Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
         testConfiguration = new 
ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
-        testClient = new 
ElasticsearchClientManager(testConfiguration.getDestination()).getClient();
+        testClient = 
ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
 
         ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
         ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
@@ -101,7 +103,7 @@ public class HdfsElasticsearchIT {
                 .setTypes(testConfiguration.getDestination().getType());
         SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertEquals(89, countResponse.getHits().getTotalHits());
+        assertEquals(countResponse.getHits().getTotalHits(), 89);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/local/elasticsearch-hdfs/src/test/resources/testng.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/resources/testng.xml 
b/local/elasticsearch-hdfs/src/test/resources/testng.xml
new file mode 100644
index 0000000..0110e1d
--- /dev/null
+++ b/local/elasticsearch-hdfs/src/test/resources/testng.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd"; >
+
+<suite name="ExampleITs">
+
+    <test name="ElasticsearchPersistWriterIT">
+        <classes>
+            <class 
name="org.apache.streams.elasticsearch.test.ElasticsearchPersistWriterIT" />
+        </classes>
+    </test>
+
+    <test name="ElasticsearchHdfsIT">
+        <classes>
+            <class name="org.apache.streams.example.test.ElasticsearchHdfsIT" 
/>
+        </classes>
+    </test>
+
+    <test name="HdfsElasticsearchIT">
+        <classes>
+            <class name="org.apache.streams.example.test.HdfsElasticsearchIT" 
/>
+        </classes>
+    </test>
+
+</suite>  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala
----------------------------------------------------------------------
diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala 
b/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala
new file mode 100644
index 0000000..aae12ec
--- /dev/null
+++ b/zeppelin/apache-zeppelin-dashboard/src/main/scala/facebook.scala
@@ -0,0 +1,143 @@
+%spark.dep
+z.reset()
+z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots";).snapshot()
+z.load("org.apache.streams:streams-provider-facebook:0.4-incubating-SNAPSHOT")
+
+%spark
+import com.typesafe.config._
+import org.apache.streams.config._
+import org.apache.streams.core._
+import org.apache.streams.facebook._
+import org.apache.streams.facebook.graph._
+import java.util.Iterator
+
+%spark
+val credentials =
+  """
+    |facebook {
+    |  oauth {
+    |    appId = "299258633581961"
+    |    appSecret = 03b887d68ee4a3117f9f087330fe8c8f
+    |  }
+    |  userAccessTokens = [
+    
|EAACEdEose0cBAG4nq7ZB36wwCGv14UToDpZCwXgZA1ZCuShBp1tPQozsbxU75RaOEiJKx75sQgox6wCNgx6rCrEL5K96oNE9EoGutFPBPAEWBZAo7xlgfx715HhAdqdmoaaFTbwJWwruehr1FwIXJr2OAfsxFrqYbPYUkXXojAtSgoEm9WrhW6RRa7os6xBIZD
+    |  ]
+    |}
+    |"""
+val credentialsConfig = ConfigFactory.parseString(credentials)
+
+%spark
+val accounts =
+  """
+    |facebook {
+    |  ids = [
+    |    {
+    |      #"id": "Apache-Software-Foundation"
+    |      "id": "108021202551732"
+    |    },
+    |    {
+    |      #"id": "Apache-Spark"
+    |      "id": "695067547183193"
+    |    },
+    |    {
+    |      # Apache-Cordova
+    |      "id": "144287225588642"
+    |    },
+    |    {
+    |      # Apache-HTTP-Server
+    |      "id": "107703115926025"
+    |    },
+    |    {
+    |      # Apache-Cassandra
+    |      "id": "136080266420061"
+    |    },
+    |    {
+    |      # Apache-Solr
+    |      "id": "333596995194"
+    |    },
+    |    {
+    |      # Apache-CXF
+    |      "id": "509899489117171"
+    |    },
+    |    {
+    |      # Apache-Kafka
+    |      "id": "109576742394607"
+    |    },
+    |    {
+    |      # Apache-Groovy
+    |      "id": "112510602100049"
+    |    },
+    |    {
+    |      # Apache-Hadoop
+    |      "id": "102175453157656"
+    |    },
+    |    {
+    |      # Apache-Hive
+    |      "id": "192818954063511"
+    |    },
+    |    {
+    |      # Apache-Mahout
+    |      "id": "109528065733066"
+    |    },
+    |    {
+    |      # Apache-HBase
+    |      "id": "103760282995363"
+    |    }
+    |  ]
+    |}
+    |"""
+val accountsConfig = ConfigFactory.parseString(accounts)
+
+%spark
+val reference = ConfigFactory.load()
+val typesafe = 
accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve()
+val config = new 
ComponentConfigurator(classOf[FacebookUserInformationConfiguration]).detectConfiguration(typesafe,
 "facebook");
+
+%spark
+// Pull info on those accounts
+val FacebookPageProvider = new FacebookPageProvider(config);
+FacebookPageProvider.prepare(null)
+FacebookPageProvider.startStream()
+//
+val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
+while(FacebookPageProvider.isRunning()) {
+  val resultSet = FacebookPageProvider.readCurrent()
+  resultSet.size()
+  val iterator = resultSet.iterator();
+  while(iterator.hasNext()) {
+    val datum = iterator.next();
+    userdata_buf += datum.getDocument
+  }
+}
+
+%spark
+//Pull activity from those accounts
+val FacebookPageFeedProvider = new FacebookPageFeedProvider(config);
+FacebookPageFeedProvider.prepare(null)
+FacebookPageFeedProvider.startStream()
+while(FacebookPageFeedProvider.isRunning())
+//
+val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
+while(FacebookPageFeedProvider.isRunning()) {
+  val resultSet = FacebookPageFeedProvider.readCurrent()
+  resultSet.size()
+  val iterator = resultSet.iterator();
+  while(iterator.hasNext()) {
+    val datum = iterator.next();
+    useractivity_buf += datum.getDocument
+  }
+}
+
+%spark
+//Normalize person(s) -> page(s)
+val FacebookTypeConverter = new FacebookTypeConverter(classOf[Page], 
classOf[Page])
+FacebookTypeConverter.prepare()
+val userdata_pages = userdata_buf.flatMap(x => 
FacebookTypeConverter.process(x))
+
+%spark
+//Normalize activities) -> posts(s)
+val FacebookTypeConverter = new FacebookTypeConverter(classOf[Post], 
classOf[Post])
+FacebookTypeConverter.prepare()
+val useractivity_posts = useractivity_buf.flatMap(x => 
FacebookTypeConverter.process(x))
+
+

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala
----------------------------------------------------------------------
diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala 
b/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala
new file mode 100644
index 0000000..9734ded
--- /dev/null
+++ b/zeppelin/apache-zeppelin-dashboard/src/main/scala/gplus.scala
@@ -0,0 +1,68 @@
+
+
+
+
+%spark
+val reference = ConfigFactory.load()
+val typesafe = 
accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve()
+val config = new 
ComponentConfigurator(classOf[GPlusConfiguration]).detectConfiguration(typesafe,
 "gplus");
+
+%spark
+// Pull info on those accounts
+val GPlusUserDataProvider = new GPlusUserDataProvider(config);
+GPlusUserDataProvider.prepare(null)
+GPlusUserDataProvider.startStream()
+//
+val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
+while(GPlusUserDataProvider.isRunning()) {
+  val resultSet = GPlusUserDataProvider.readCurrent()
+  resultSet.size()
+  val iterator = resultSet.iterator();
+  while(iterator.hasNext()) {
+    val datum = iterator.next();
+    userdata_buf += datum.getDocument
+  }
+}
+
+%spark
+//Pull activity from those accounts
+val GPlusUserActivityProvider = new GPlusUserActivityProvider(config);
+GPlusUserActivityProvider.prepare(null)
+GPlusUserActivityProvider.startStream()
+while(GPlusUserActivityProvider.isRunning())
+//
+val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
+while(GPlusUserActivityProvider.isRunning()) {
+  val resultSet = GPlusUserActivityProvider.readCurrent()
+  resultSet.size()
+  val iterator = resultSet.iterator();
+  while(iterator.hasNext()) {
+    val datum = iterator.next();
+    useractivity_buf += datum.getDocument
+  }
+}
+
+%spark
+//Normalize person(s) -> page(s)
+val GooglePlusTypeConverter = new GooglePlusTypeConverter()
+GooglePlusTypeConverter.prepare()
+val userdata_pages = userdata_buf.flatMap(x => 
GooglePlusTypeConverter.process(x))
+
+
+%spark
+import com.google.gson.ExclusionStrategy
+import com.google.gson.FieldAttributes
+import com.sun.javafx.runtime.async.AbstractAsyncOperation
+import sun.jvm.hotspot.runtime.NativeSignatureIterator
+class MyExclusionStrategy extends ExclusionStrategy {
+  def shouldSkipField(f: FieldAttributes) : Boolean {
+    f.getName().toLowerCase().contains("additionalProperties");
+  }
+}
+
+//Normalize activities) -> posts(s)
+val GooglePlusTypeConverter = new GooglePlusTypeConverter()
+GooglePlusTypeConverter.prepare()
+val useractivity_posts = useractivity_buf.flatMap(x => 
GooglePlusTypeConverter.process(x))
+
+

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala
----------------------------------------------------------------------
diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala 
b/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala
new file mode 100644
index 0000000..7c67f75
--- /dev/null
+++ b/zeppelin/apache-zeppelin-dashboard/src/main/scala/setup.scala
@@ -0,0 +1,9 @@
+%spark.dep
+z.reset()
+z.load("org.apache.streams:streams-core:0.4-incubating")
+z.load("org.apache.streams:streams-converters:0.4-incubating")
+z.load("org.apache.streams:streams-pojo:0.4-incubating")
+z.load("org.apache.streams:streams-provider-twitter:0.4-incubating")
+z.load("org.apache.streams:streams-provider-facebook:0.4-incubating")
+z.load("org.apache.streams:streams-provider-youtube:0.4-incubating")
+z.load("org.apache.streams:google-gplus:0.4-incubating")

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala
----------------------------------------------------------------------
diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala 
b/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala
new file mode 100644
index 0000000..86c83d9
--- /dev/null
+++ b/zeppelin/apache-zeppelin-dashboard/src/main/scala/twitter.scala
@@ -0,0 +1,173 @@
+%spark.dep
+z.reset()
+z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots";).snapshot()
+z.load("org.apache.streams:streams-provider-twitter:0.4-incubating-SNAPSHOT")
+
+%spark
+import com.typesafe.config._
+import org.apache.streams.config._
+import org.apache.streams.core._
+import java.util.Iterator
+import org.apache.streams.twitter.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.pojo._
+import org.apache.streams.twitter.provider._
+
+%spark
+val consumerKey = z.input("ConsumerKey", "jF3awfLECUZ4tFAwS5bZcha8c")
+val consumerSecret = z.input("ConsumerSecret", 
"0IjoS5aPE88kNSREK6HNzAhUcJMziSlaT1fOkA5pzpusZLrhCj")
+val accessToken = z.input("AccessToken", 
"42232950-CzaYlt2M6SPGI883B5NZ8vROcL4qUsTJlp9wIM2K2")
+val accessTokenSecret = z.input("AccessTokenSecret", 
"vviQzladFUl23hdVelEiIknSLoHfAs40DqTv3RdXHhmz0")
+
+%spark
+val credentials_hocon = s"""
+    twitter {
+      oauth {
+       consumerKey = "$consumerKey"
+    consumerSecret = "$consumerSecret"
+    accessToken = "$accessToken"
+    accessTokenSecret = "$accessTokenSecret"
+      }
+      retrySleepMs = 5000
+  retryMax = 250
+    }
+"""
+
+%spark
+val accounts_hocon = s"""
+twitter.info = [
+#    "ApacheSpark"
+    1551361069
+#    "ApacheFlink"
+    2493948216
+#    "ApacheKafka"
+    1287555762
+#   "Hadoop"
+    175671289
+#   "ApacheCassandra"
+    19041500
+#   "ApacheSolr"
+    22742048
+#   "ApacheMahout"
+    1123330975
+#   "ApacheHive"
+    1188044936
+#   "ApacheHbase"
+    2912997159
+]
+"""
+
+%spark
+val reference = ConfigFactory.load()
+val credentials = ConfigFactory.parseString(credentials_hocon)
+val accounts = ConfigFactory.parseString(accounts_hocon)
+val typesafe = 
accounts.withFallback(credentials).withFallback(reference).resolve()
+val twitterUserInformationConfiguration = new 
ComponentConfigurator(classOf[TwitterUserInformationConfiguration]).detectConfiguration(typesafe,
 "twitter");
+
+%spark
+val userdata_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
+
+val twitterUserInformationProvider = new 
TwitterUserInformationProvider(twitterUserInformationConfiguration);
+twitterUserInformationProvider.prepare()
+twitterUserInformationProvider.startStream()
+while(twitterUserInformationProvider.isRunning()) {
+  val resultSet = twitterUserInformationProvider.readCurrent()
+  resultSet.size()
+  val iterator = resultSet.iterator();
+  while(iterator.hasNext()) {
+    val datum = iterator.next();
+    //println(datum.getDocument)
+    userdata_buf += datum.getDocument
+  }
+}
+userdata_buf.size
+
+%spark
+import com.typesafe.config._
+import org.apache.streams.config._
+import org.apache.streams.core._
+import java.util.Iterator
+import org.apache.streams.twitter.TwitterUserInformationConfiguration
+
+import org.apache.streams.twitter.pojo._
+import org.apache.streams.twitter.provider._
+
+val timeline_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
+
+val twitterTimelineProvider = new 
TwitterTimelineProvider(twitterUserInformationConfiguration);
+twitterTimelineProvider.prepare(twitterUserInformationConfiguration)
+twitterTimelineProvider.startStream()
+while(twitterTimelineProvider.isRunning()) {
+  val resultSet = twitterTimelineProvider.readCurrent()
+  resultSet.size()
+  val iterator = resultSet.iterator();
+  while(iterator.hasNext()) {
+    val datum = iterator.next();
+    //println(datum.getDocument)
+    timeline_buf += datum.getDocument
+  }
+}
+timeline_buf.size
+
+%spark
+import org.apache.streams.converter.ActivityObjectConverterProcessor
+import org.apache.streams.core.StreamsProcessor
+import org.apache.streams.pojo.json.ActivityObject
+import scala.collection.JavaConverters
+import scala.collection.JavaConversions._
+
+val converter = new ActivityObjectConverterProcessor()
+converter.prepare()
+
+val user_datums = userdata_buf.map(x => new StreamsDatum(x))
+val actor_datums = user_datums.flatMap(x => converter.process(x))
+val pages = actor_datums.map(x => x.getDocument.asInstanceOf[ActivityObject])
+
+%spark
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import sqlContext._
+import sqlContext.implicits._
+
+val mapper = StreamsJacksonMapper.getInstance();
+val pages_jsons = pages.map(o => mapper.writeValueAsString(o))
+val pagesRDD = sc.parallelize(pages_jsons)
+
+val pagesDF = sqlContext.read.json(pagesRDD)
+
+val pagescleanDF = pagesDF.withColumn("summary", 
removePunctuationAndSpecialChar(pagesDF("summary")))
+pagescleanDF.registerTempTable("twitter_pages")
+pagescleanDF.printSchema
+
+%spark
+import org.apache.streams.converter.ActivityConverterProcessor
+import org.apache.streams.core.StreamsProcessor
+import org.apache.streams.pojo.json.Activity
+import scala.collection.JavaConverters
+import scala.collection.JavaConversions._
+
+val converter = new ActivityConverterProcessor()
+converter.prepare()
+
+val status_datums = timeline_buf.map(x => new StreamsDatum(x))
+val activity_datums = status_datums.flatMap(x => converter.process(x)).map(x 
=> x.getDocument.asInstanceOf[Activity])
+activity_datums.size
+
+%spark
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import sqlContext._
+import sqlContext.implicits._
+
+val mapper = StreamsJacksonMapper.getInstance();
+val jsons = activity_datums.map(o => mapper.writeValueAsString(o))
+val activitiesRDD = sc.parallelize(jsons)
+
+val activitiesDF = sqlContext.read.json(activitiesRDD)
+
+val cleanDF = activitiesDF.withColumn("content", 
removePunctuationAndSpecialChar(activitiesDF("content")))
+cleanDF.registerTempTable("twitter_posts")
+cleanDF.printSchema
+
+%spark.sql
+select id, displayName, handle, summary, extensions.favorites, 
extensions.followers, extensions.posts from twitter_pages
+
+%spark.sql
+select id, actor.id, content, published, rebroadcasts.count from twitter_posts

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/47f159e1/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala
----------------------------------------------------------------------
diff --git a/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala 
b/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala
new file mode 100644
index 0000000..2743952
--- /dev/null
+++ b/zeppelin/apache-zeppelin-dashboard/src/main/scala/youtube.scala
@@ -0,0 +1,102 @@
+%spark.dep
+z.reset()
+z.addRepo("apache-snapshots").url("https://repository.apache.org/content/repositories/snapshots";).snapshot()
+z.load("org.apache.streams:streams-provider-youtube:0.4-incubating-SNAPSHOT")
+
+%spark
+import com.typesafe.config._
+import org.apache.streams.config._
+import org.apache.streams.core._
+import com.youtube.provider._
+import org.apache.youtube.pojo._
+import java.util.Iterator
+
+%spark
+val credentials =
+  """
+  |youtube {
+  |  apiKey = 79d9f9ca2796d1ec5334faf8d6efaa6456a297e6
+  |  oauth {
+  |    serviceAccountEmailAddress = 
"streams...@adroit-particle-764.iam.gserviceaccount.com"
+  |    pathToP12KeyFile = streams-c84fa47bd759.p12
+  |  }
+  |}
+  |"""
+val credentialsConfig = ConfigFactory.parseString(credentials)
+
+%spark
+val accounts =
+  """
+    |youtube {
+    |  youtubeUsers = [
+    |    {
+    |      userId = "UCLDJ_V9KUOdOFSbDvPfGBxw"
+    |    }
+    |  ]
+    |}
+    |"""
+val accountsConfig = ConfigFactory.parseString(accounts)
+
+%spark
+val reference = ConfigFactory.load()
+val typesafe = 
accountsConfig.withFallback(credentialsConfig).withFallback(reference).resolve()
+val config = new 
ComponentConfigurator(classOf[YoutubeConfiguration]).detectConfiguration(typesafe,
 "youtube");
+
+%spark
+// Pull info on those channels
+val YoutubeChannelProvider = new YoutubeChannelProvider(config);
+YoutubeChannelProvider.prepare(null)
+YoutubeChannelProvider.startStream()
+//
+val channel_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
+while(YoutubeChannelProvider.isRunning()) {
+  val resultSet = YoutubeChannelProvider.readCurrent()
+  resultSet.size()
+  val iterator = resultSet.iterator();
+  while(iterator.hasNext()) {
+    val datum = iterator.next();
+    channel_buf += datum.getDocument
+  }
+}
+
+%spark
+//Pull activity from those accounts
+val YoutubeUserActivityProvider = new YoutubeUserActivityProvider(config);
+YoutubeUserActivityProvider.prepare(null)
+YoutubeUserActivityProvider.startStream()
+while(YoutubeUserActivityProvider.isRunning())
+//
+val useractivity_buf = scala.collection.mutable.ArrayBuffer.empty[Object]
+while(YoutubeUserActivityProvider.isRunning()) {
+  val resultSet = YoutubeUserActivityProvider.readCurrent()
+  resultSet.size()
+  val iterator = resultSet.iterator();
+  while(iterator.hasNext()) {
+    val datum = iterator.next();
+    useractivity_buf += datum.getDocument
+  }
+}
+
+%spark
+import org.apache.streams.core.StreamsDatum
+import com.youtube.processor._
+import scala.collection.JavaConversions._
+//Normalize activities -> posts(s)
+val YoutubeTypeConverter = new YoutubeTypeConverter()
+YoutubeTypeConverter.prepare()
+val useractivity_posts = useractivity_buf.flatMap(x => 
YoutubeTypeConverter.process(x))
+
+%spark
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+val mapper = StreamsJacksonMapper.getInstance();
+val activitiesRDD = sc.parallelize(useractivity_posts.map(o => 
mapper.writeValueAsString(o)))
+
+val activitiesDF = sqlContext.read.json(activitiesRDD)
+
+activitiesDF.registerTempTable("activities")
+
+%spark.sql
+select count(id) from activitiesDF


Reply via email to