Repository: incubator-streams
Updated Branches:
  refs/heads/master cae682d6e -> 09ebd2e8c


level up facebook provider

update jFacebook client version (STREAMS-413)
use provider without a runtime (STREAMS-403)
add main methods to each Provider (STREAMS-411)
add real integration tests (STREAMS-415)
isolate isRunning and readCurrent (STREAMS-425)
FacebookPostSerDeIT was failing, now it’s passing


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/112fba08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/112fba08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/112fba08

Branch: refs/heads/master
Commit: 112fba0896d0588ea236a42ed5cc7ff95987260f
Parents: 3234cdb
Author: Steve Blackmon @steveblackmon <[email protected]>
Authored: Thu Oct 13 15:57:58 2016 -0500
Committer: Steve Blackmon @steveblackmon <[email protected]>
Committed: Thu Oct 13 15:57:58 2016 -0500

----------------------------------------------------------------------
 .../streams-provider-facebook/pom.xml           |   2 +-
 .../provider/FacebookDataCollector.java         |  15 +-
 .../facebook/provider/FacebookProvider.java     |  33 +++-
 .../provider/page/FacebookPageProvider.java     |  79 +++++++-
 .../pagefeed/FacebookPageFeedProvider.java      |  77 ++++++++
 .../src/main/resources/facebook.conf            |  16 --
 .../src/main/resources/reference.conf           |  17 ++
 .../src/site/markdown/index.md                  |  29 ++-
 .../provider/page/TestFacebookPageProvider.java | 186 -------------------
 .../facebook/provider/page/TestPage.java        | 125 -------------
 .../provider/pagefeed/TestFacebookProvider.java |  96 ----------
 .../test/FacebookActivityActorSerDeIT.java      |  67 -------
 .../facebook/test/FacebookActivitySerDeIT.java  |  74 --------
 .../facebook/test/FacebookPageSerDeIT.java      |  77 --------
 .../facebook/test/FacebookPostSerDeIT.java      |  98 ----------
 .../apache/streams/facebook/test/TestPage.java  | 145 +++++++++++++++
 .../test/data/FacebookActivityActorSerDeIT.java |  66 +++++++
 .../test/data/FacebookActivitySerDeIT.java      |  73 ++++++++
 .../facebook/test/data/FacebookPageSerDeIT.java |  76 ++++++++
 .../facebook/test/data/FacebookPostSerDeIT.java |  97 ++++++++++
 .../test/providers/TestFacebookProvider.java    |  96 ++++++++++
 .../providers/page/FacebookPageProviderIT.java  |  54 ++++++
 .../pagefeed/FacebookPageFeedProviderIT.java    |  52 ++++++
 .../resources/FacebookPageFeedProviderIT.conf   |  23 +++
 .../test/resources/FacebookPageProviderIT.conf  |  23 +++
 25 files changed, 938 insertions(+), 758 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/pom.xml 
b/streams-contrib/streams-provider-facebook/pom.xml
index dd14d66..4803ce2 100644
--- a/streams-contrib/streams-provider-facebook/pom.xml
+++ b/streams-contrib/streams-provider-facebook/pom.xml
@@ -87,7 +87,7 @@
         <dependency>
             <groupId>org.facebook4j</groupId>
             <artifactId>facebook4j-core</artifactId>
-            <version>2.1.0</version>
+            <version>2.4.7</version>
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
index f3bcde6..33ee9dc 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
@@ -18,6 +18,7 @@
 package org.apache.streams.facebook.provider;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
 import facebook4j.Facebook;
 import facebook4j.FacebookFactory;
 import facebook4j.conf.ConfigurationBuilder;
@@ -94,17 +95,19 @@ public abstract class FacebookDataCollector implements 
Runnable {
      */
     protected Facebook getNextFacebookClient() {
             ConfigurationBuilder cb = new ConfigurationBuilder();
-            cb.setDebugEnabled(true)
-                    .setOAuthAppId(this.config.getOauth().getAppId())
-                    .setOAuthAppSecret(this.config.getOauth().getAppSecret());
+            cb.setDebugEnabled(true);
+            cb.setOAuthPermissions(READ_ONLY);
+            cb.setOAuthAppId(this.config.getOauth().getAppId());
+            cb.setOAuthAppSecret(this.config.getOauth().getAppSecret());
             if(this.authTokens.numAvailableTokens() > 0)
-                    
cb.setOAuthAccessToken(this.authTokens.getNextAvailableToken());
+                
cb.setOAuthAccessToken(this.authTokens.getNextAvailableToken());
             else {
                 
cb.setOAuthAccessToken(this.config.getOauth().getAppAccessToken());
                 LOGGER.debug("appAccessToken : {}", 
this.config.getOauth().getAppAccessToken());
             }
-                    cb.setOAuthPermissions(READ_ONLY)
-                    .setJSONStoreEnabled(true);
+            cb.setJSONStoreEnabled(true);
+            if(!Strings.isNullOrEmpty(config.getVersion()))
+                cb.setRestBaseURL("https://graph.facebook.com/"; + 
config.getVersion() + "/");
             LOGGER.debug("appId : {}", this.config.getOauth().getAppId());
             LOGGER.debug("appSecret: {}", 
this.config.getOauth().getAppSecret());
             FacebookFactory ff = new FacebookFactory(cb.build());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
index 563e0e9..e907082 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
@@ -20,6 +20,10 @@ package org.apache.streams.facebook.provider;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.ConfigRenderOptions;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
@@ -36,6 +40,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.*;
@@ -56,7 +62,9 @@ public abstract class FacebookProvider implements 
StreamsProvider {
     protected BlockingQueue<StreamsDatum> datums;
 
     private AtomicBoolean isComplete;
-    private ExecutorService executor;
+    private ListeningExecutorService executor;
+    List<ListenableFuture<Object>> futures = new ArrayList<>();
+
     private FacebookDataCollector dataCollector;
 
     public FacebookProvider() {
@@ -78,8 +86,9 @@ public abstract class FacebookProvider implements 
StreamsProvider {
 
     @Override
     public void startStream() {
-        this.dataCollector = getDataCollector();
-        this.executor.submit(dataCollector);
+        ListenableFuture future = executor.submit(getDataCollector());
+        futures.add(future);
+        executor.shutdown();
     }
 
     protected abstract FacebookDataCollector getDataCollector();
@@ -92,7 +101,6 @@ public abstract class FacebookProvider implements 
StreamsProvider {
             
ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums), 
batch);
             ++batchSize;
         }
-        this.isComplete.set(batch.isEmpty() && this.datums.isEmpty() && 
this.dataCollector.isComplete());
         return new StreamsResultSet(batch);
     }
 
@@ -107,15 +115,10 @@ public abstract class FacebookProvider implements 
StreamsProvider {
     }
 
     @Override
-    public boolean isRunning() {
-        return !this.isComplete.get();
-    }
-
-    @Override
     public void prepare(Object configurationObject) {
         this.datums = Queues.newLinkedBlockingQueue();
         this.isComplete = new AtomicBoolean(false);
-        this.executor = Executors.newFixedThreadPool(1);
+        this.executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
     }
 
     @Override
@@ -138,4 +141,14 @@ public abstract class FacebookProvider implements 
StreamsProvider {
         }
         this.configuration.setIds(ids);
     }
+
+    @Override
+    public boolean isRunning() {
+        if (datums.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
+            LOGGER.info("Completed");
+            isComplete.set(true);
+            LOGGER.info("Exiting");
+        }
+        return !isComplete.get();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
index bfbb227..dea17e1 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
@@ -17,20 +17,58 @@
  */
 package org.apache.streams.facebook.provider.page;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.facebook.FacebookConfiguration;
 import org.apache.streams.facebook.provider.FacebookDataCollector;
 import org.apache.streams.facebook.provider.FacebookProvider;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
- * Streams Provider which aggregates Page Profiles based on the ID List 
contained in the
+ * Streams Provider which collects Page Profiles in the ID List contained in 
the
  * FacebookConfiguration object
+ *
+ * To use from command line:
+ *
+ *  Supply (at least) the following required configuration in application.conf:
+ *
+ *  facebook.oauth.appId
+ *  facebook.oauth.appSecret
+ *  facebook.oauth.userAccessToken
+ *
+ *  Launch using:
+ *
+ *  mvn exec:java 
-Dexec.mainClass=org.apache.streams.facebook.provider.page.FacebookPageProvider 
-Dexec.args="application.conf pages.json"
+
  */
 public class FacebookPageProvider extends FacebookProvider {
 
+    public static final String STREAMS_ID = "FacebookPageProvider";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FacebookPageProvider.class);
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
     public FacebookPageProvider(FacebookConfiguration facebookConfiguration) {
         super(facebookConfiguration);
     }
@@ -44,4 +82,43 @@ public class FacebookPageProvider extends FacebookProvider {
     protected FacebookDataCollector getDataCollector() {
         return new FacebookPageDataCollector(super.datums, 
super.configuration);
     }
+
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config conf = ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = conf.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+        FacebookConfiguration config = new 
ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe,
 "facebook");
+        FacebookPageProvider provider = new FacebookPageProvider(config);
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = MAPPER.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
index db7983f..308b129 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/pagefeed/FacebookPageFeedProvider.java
@@ -17,15 +17,92 @@
  */
 package org.apache.streams.facebook.provider.pagefeed;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.facebook.FacebookConfiguration;
 import org.apache.streams.facebook.provider.FacebookDataCollector;
 import org.apache.streams.facebook.provider.FacebookProvider;
+import org.apache.streams.facebook.provider.page.FacebookPageProvider;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
  */
 public class FacebookPageFeedProvider extends FacebookProvider {
+
+    public static final String STREAMS_ID = "FacebookPageFeedProvider";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FacebookPageProvider.class);
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    public FacebookPageFeedProvider() {
+        super();
+    }
+
+    public FacebookPageFeedProvider(FacebookConfiguration config) {
+        super(config);
+    }
+
     @Override
     protected FacebookDataCollector getDataCollector() {
         return new FacebookPageFeedDataCollector(super.datums, 
super.configuration);
     }
+
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config conf = ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = conf.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+        FacebookConfiguration config = new 
ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe,
 "facebook");
+        FacebookPageFeedProvider provider = new 
FacebookPageFeedProvider(config);
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = MAPPER.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/main/resources/facebook.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/resources/facebook.conf 
b/streams-contrib/streams-provider-facebook/src/main/resources/facebook.conf
deleted file mode 100644
index 13a8339..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/resources/facebook.conf
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf 
b/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
new file mode 100644
index 0000000..d57da0b
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+facebook.version = "v2.8"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/site/markdown/index.md 
b/streams-contrib/streams-provider-facebook/src/site/markdown/index.md
index 8fbe217..b80f2f0 100644
--- a/streams-contrib/streams-provider-facebook/src/site/markdown/index.md
+++ b/streams-contrib/streams-provider-facebook/src/site/markdown/index.md
@@ -15,10 +15,37 @@ streams-provider-facebook contains schema definitions, 
providers, conversions, a
 |--------|
 | 
[FacebookConfiguration.json](org/apache/streams/facebook/FacebookConfiguration.json
 "FacebookConfiguration.json") 
[FacebookConfiguration.html](apidocs/org/apache/streams/facebook/FacebookConfiguration.html
 "javadoc") |
 
-## Example
+## Components
 
 ![components](components.dot.svg "Components")
 
+Test:
+-----
+
+Create a local file `facebook.conf` with valid twitter credentials
+
+    facebook {
+      oauth {
+        appId = ""
+        appSecret = ""
+      }
+      userAccessTokens = [
+        ""
+      ]
+    }
+    
+Build with integration testing enabled, using your credentials
+
+    mvn clean test verify -DskipITs=false 
-DargLine="-Dconfig.file=`pwd`/facebook.conf"
+
+Confirm that you can get data from testing endpoints with Graph API Explorer:
+ 
+ - ${page-id}
+ - ${page-id}/feed
+
+[https://developers.facebook.com/tools/explorer/](https://developers.facebook.com/tools/explorer/
 "https://developers.facebook.com/tools/explorer/";)
+  
+  
 [JavaDocs](apidocs/index.html "JavaDocs")
 
 ###### Licensed under Apache License 2.0 - 
http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/page/TestFacebookPageProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/page/TestFacebookPageProvider.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/page/TestFacebookPageProvider.java
deleted file mode 100644
index fd4a3b9..0000000
--- 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/page/TestFacebookPageProvider.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.facebook.provider.page;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import facebook4j.Page;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.facebook.FacebookConfiguration;
-import org.apache.streams.facebook.FacebookOAuthConfiguration;
-import org.apache.streams.facebook.IdConfig;
-import org.joda.time.DateTime;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * Unit Tests For {@link org.apache.streams.facebook.provider.FacebookProvider}
- */
-public class TestFacebookPageProvider {
-    final int DEFAULT_WAIT_PERIOD = 5000; //5 seconds in milliseconds
-
-    @Test
-    public void testPageProviderCollector() {
-        FacebookPageProvider facebookPageProvider = new 
FacebookPageProvider(new FacebookConfiguration());
-        assertEquals(facebookPageProvider.getDataCollector().getClass(), 
FacebookPageDataCollector.class);
-    }
-
-    @Test
-    public void testPageProviderEmpty() throws Exception {
-        Map<IdConfig, Page> pageMap = createPageMap();
-
-        testPageProvider(pageMap);
-    }
-
-    @Test
-    public void testPageProviderSingleDocument() throws Exception {
-        Map<IdConfig, Page> pageMap = createPageMap("fake_id_1");
-
-        testPageProvider(pageMap);
-    }
-
-    @Test
-    public void testPageProviderMultipleDocuments() throws Exception {
-        Map<IdConfig, Page> pageMap = createPageMap("fake_id_1", "fake_id_2", 
"fake_id_3", "fake_id_4", "fake_id_5");
-
-        testPageProvider(pageMap);
-    }
-
-    /**
-     * Given a map of Mocked IdConfig->Page combinations, run the 
FacebookPageProvider end to end
-     * and assert that correct behavior is occurring
-     *
-     * @param pageMap
-     * @throws Exception
-     */
-    private void testPageProvider(Map<IdConfig, Page> pageMap) throws 
Exception {
-        FacebookConfiguration facebookConfiguration = 
createFacebookConfiguration(pageMap);
-
-        FacebookPageProvider facebookPageProvider = spy(new 
FacebookPageProvider(facebookConfiguration));
-        facebookPageProvider.prepare(null);
-
-        FacebookPageDataCollector dataCollector = 
createDataCollector(facebookPageProvider.getQueue(), facebookConfiguration, 
pageMap);
-
-        doReturn(dataCollector).when(facebookPageProvider).getDataCollector();
-
-        assertFalse(dataCollector.isComplete());
-
-        facebookPageProvider.startStream();
-
-        Thread.sleep(DEFAULT_WAIT_PERIOD);
-        StreamsResultSet streamsDatums = facebookPageProvider.readCurrent();
-
-        assertEquals(pageMap.size(), streamsDatums.size());
-        assertTrue(containsExpectedDocuments(pageMap, streamsDatums));
-
-        assertTrue(dataCollector.isComplete());
-    }
-
-    /**
-     * Iterates through both the pageMap and returned datums from the provider
-     * and determines whether or not all expected documents are present
-     *
-     * @param pageMap
-     * @param streamsDatums
-     * @return
-     */
-    private boolean containsExpectedDocuments(Map<IdConfig, Page> pageMap, 
StreamsResultSet streamsDatums) {
-        boolean containsAll = true;
-
-        List<String> pageMapIds = Lists.newArrayList();
-        for(IdConfig pageMapId : pageMap.keySet()) {
-            pageMapIds.add(pageMapId.getId());
-        }
-
-        for(StreamsDatum datum : streamsDatums.getQueue()) {
-            if(!pageMapIds.contains(datum.getId())) {
-                containsAll = false;
-                break;
-            }
-
-            pageMapIds.remove(datum.getId());
-        }
-
-        return containsAll;
-    }
-
-    /**
-     * Creates a Mocked FacebookConfiguration using the IDs declared in the 
passed
-     * in pageMap
-     *
-     * @param pageMap
-     * @return
-     */
-    private FacebookConfiguration createFacebookConfiguration(Map<IdConfig, 
Page> pageMap) {
-        FacebookConfiguration facebookConfiguration = new 
FacebookConfiguration();
-        facebookConfiguration.setIds(Sets.newHashSet(pageMap.keySet()));
-        facebookConfiguration.setOauth(new FacebookOAuthConfiguration());
-
-        return facebookConfiguration;
-    }
-
-    /**
-     * Given an indeterminately sized list of IDs, create a Map containing
-     * Mocked Pages to be used in the tests
-     *
-     * @param ids
-     * @return
-     */
-    private Map<IdConfig, Page> createPageMap(String... ids) {
-        Map<IdConfig, Page> map = Maps.newHashMap();
-
-        for(String id : ids) {
-            IdConfig idConfig = new IdConfig()
-                    .withBeforeDate(new DateTime())
-                    .withId(id);
-            map.put(idConfig, new TestPage(id, id + " Name"));
-        }
-
-        return map;
-    }
-
-    /**
-     * Given a BlockingQueue, Mocked FacebookConfiguration and Map containing 
IdConfig objects and Mocked Pages,
-     * create a FacebookPageDataCollector which outputs those pages to the 
passed in Queue
-     *
-     * @param queue
-     * @param facebookConfiguration
-     * @param pages
-     * @return
-     */
-    private FacebookPageDataCollector 
createDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration 
facebookConfiguration, Map<IdConfig, Page> pages) {
-        final Map<IdConfig, Page> finalPages = pages;
-
-        FacebookPageDataCollector dataCollector = new 
FacebookPageDataCollector(queue, facebookConfiguration) {
-            @Override
-            protected void getData(IdConfig id) throws Exception {
-                super.outputData(finalPages.get(id), 
finalPages.get(id).getId());
-            }
-        };
-
-        return dataCollector;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/page/TestPage.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/page/TestPage.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/page/TestPage.java
deleted file mode 100644
index ac13c58..0000000
--- 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/page/TestPage.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.facebook.provider.page;
-
-import facebook4j.Cover;
-import facebook4j.Page;
-import facebook4j.Place;
-
-import java.net.URL;
-import java.util.Date;
-
-public class TestPage implements Page {
-    private String id;
-    private String name;
-
-    public TestPage(String id, String name) {
-        this.id = id;
-        this.name = name;
-    }
-
-    @Override
-    public String getId() {
-        return id;
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public URL getLink() {
-        return null;
-    }
-
-    @Override
-    public String getCategory() {
-        return null;
-    }
-
-    @Override
-    public Boolean isPublished() {
-        return null;
-    }
-
-    @Override
-    public Boolean canPost() {
-        return null;
-    }
-
-    @Override
-    public Integer getLikes() {
-        return null;
-    }
-
-    @Override
-    public Place.Location getLocation() {
-        return null;
-    }
-
-    @Override
-    public String getPhone() {
-        return null;
-    }
-
-    @Override
-    public Integer getCheckins() {
-        return null;
-    }
-
-    @Override
-    public URL getPicture() {
-        return null;
-    }
-
-    @Override
-    public Cover getCover() {
-        return null;
-    }
-
-    @Override
-    public String getWebsite() {
-        return null;
-    }
-
-    @Override
-    public Integer getTalkingAboutCount() {
-        return null;
-    }
-
-    @Override
-    public String getAccessToken() {
-        return null;
-    }
-
-    @Override
-    public Boolean isCommunityPage() {
-        return null;
-    }
-
-    @Override
-    public Integer getWereHereCount() {
-        return null;
-    }
-
-    @Override
-    public Date getCreatedTime() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/pagefeed/TestFacebookProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/pagefeed/TestFacebookProvider.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/pagefeed/TestFacebookProvider.java
deleted file mode 100644
index a7ad49d..0000000
--- 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/provider/pagefeed/TestFacebookProvider.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.facebook.provider.pagefeed;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.facebook.FacebookConfiguration;
-import org.apache.streams.facebook.IdConfig;
-import org.apache.streams.facebook.provider.FacebookDataCollector;
-import org.apache.streams.facebook.provider.FacebookProvider;
-import org.junit.Test;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
-
-import static org.junit.Assert.*;
-
-/**
- * Unit Tests For {@link org.apache.streams.facebook.provider.FacebookProvider}
- */
-public class TestFacebookProvider {
-
-    @Test
-    public void testFacebookProvider() throws Exception {
-        //Test that streams starts and shut downs.
-        final CyclicBarrier barrier = new CyclicBarrier(2);
-        FacebookProvider provider = new FacebookProvider(new 
FacebookConfiguration()) {
-            @Override
-            protected FacebookDataCollector getDataCollector() {
-                return new TestFacebookDataCollector(barrier, 
super.configuration, super.datums);
-            }
-        };
-        provider.prepare(null);
-        provider.startStream();
-        assertTrue(provider.isRunning());
-        barrier.await();
-        assertTrue(provider.isRunning());
-        assertEquals(5, provider.readCurrent().size());
-        barrier.await();
-        assertEquals(0, provider.readCurrent().size());
-        assertFalse(provider.isRunning());
-        provider.cleanUp();
-    }
-
-    private class TestFacebookDataCollector extends FacebookDataCollector {
-
-        private CyclicBarrier barrier;
-        private BlockingQueue<StreamsDatum> queue;
-
-        public TestFacebookDataCollector(CyclicBarrier barrier, 
FacebookConfiguration config, BlockingQueue<StreamsDatum> queue) {
-            super(config, queue);
-            this.barrier = barrier;
-            this.queue = queue;
-
-        }
-
-        @Override
-        protected void getData(IdConfig id) throws Exception {
-
-        }
-
-        @Override
-        public void run() {
-            try {
-                for(int i=0; i < 5; ++i) {
-                    super.outputData(new Integer(i), ""+i);
-                }
-                this.barrier.await();
-                super.isComplete.set(true);
-                this.barrier.await();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            } catch (BrokenBarrierException bbe) {
-                fail();
-            }
-        }
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivityActorSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivityActorSerDeIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivityActorSerDeIT.java
deleted file mode 100644
index 18b6489..0000000
--- 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivityActorSerDeIT.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.facebook.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.input.BoundedInputStream;
-import org.apache.streams.facebook.Page;
-import org.apache.streams.facebook.api.FacebookPageActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-
-/**
- * Tests conversion of Facebook Page inputs to Actor
- */
-public class FacebookActivityActorSerDeIT {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(FacebookActivityActorSerDeIT.class);
-    private FacebookPageActivitySerializer serializer = new 
FacebookPageActivitySerializer();
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    @Test
-    public void Tests() throws Exception
-    {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
-        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
-
-        InputStream is = 
FacebookActivityActorSerDeIT.class.getResourceAsStream("/testpage.json");
-        Joiner joiner = Joiner.on(" ").skipNulls();
-        is = new BoundedInputStream(is, 10000);
-        String json;
-
-        json = joiner.join(IOUtils.readLines(is));
-        LOGGER.debug(json);
-
-        Page page = mapper.readValue(json, Page.class);
-
-        Activity activity = serializer.deserialize(page);
-
-        LOGGER.debug(mapper.writeValueAsString(activity));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivitySerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivitySerDeIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivitySerDeIT.java
deleted file mode 100644
index b5d8df0..0000000
--- 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookActivitySerDeIT.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.facebook.test;
-
-import org.apache.streams.facebook.api.FacebookPostActivitySerializer;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.input.BoundedInputStream;
-import org.apache.streams.facebook.Post;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-
-/**
- * Tests conversion of Facebook Post inputs to Activity
- */
-public class FacebookActivitySerDeIT {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(FacebookActivitySerDeIT.class);
-    private FacebookPostActivitySerializer serializer = new 
FacebookPostActivitySerializer();
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    @Test
-    public void Tests()
-    {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
-        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
-
-        InputStream is = 
FacebookActivitySerDeIT.class.getResourceAsStream("/testpost.json");
-        Joiner joiner = Joiner.on(" ").skipNulls();
-        is = new BoundedInputStream(is, 10000);
-        String json;
-
-        try {
-            json = joiner.join(IOUtils.readLines(is));
-            LOGGER.debug(json);
-
-            Post post = mapper.readValue(json, Post.class);
-
-            Activity activity = serializer.deserialize(post);
-
-            LOGGER.debug(mapper.writeValueAsString(activity));
-
-        } catch( Exception e ) {
-            LOGGER.error("Exception: ", e);
-            Assert.fail();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPageSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPageSerDeIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPageSerDeIT.java
deleted file mode 100644
index bba30f2..0000000
--- 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPageSerDeIT.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.facebook.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.input.BoundedInputStream;
-import org.apache.streams.facebook.Page;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-
-/**
- * Tests serialization of Facebook Page inputs
- */
-public class FacebookPageSerDeIT {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(FacebookPageSerDeIT.class);
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    @Test
-    public void Tests()
-    {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
-        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
-
-        InputStream is = 
FacebookPageSerDeIT.class.getResourceAsStream("/testpage.json");
-        Joiner joiner = Joiner.on(" ").skipNulls();
-        is = new BoundedInputStream(is, 10000);
-        String json;
-
-        try {
-            json = joiner.join(IOUtils.readLines(is));
-            LOGGER.debug(json);
-
-            Page ser = mapper.readValue(json, Page.class);
-
-            String de = mapper.writeValueAsString(ser);
-
-            LOGGER.debug(de);
-
-            Page serde = mapper.readValue(de, Page.class);
-
-            Assert.assertEquals(ser, serde);
-
-            LOGGER.debug(mapper.writeValueAsString(serde));
-
-        } catch( Exception e ) {
-            LOGGER.error("Exception: ", e);
-            Assert.fail();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostSerDeIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostSerDeIT.java
deleted file mode 100644
index 46db1b3..0000000
--- 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/FacebookPostSerDeIT.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.facebook.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.input.BoundedInputStream;
-import org.apache.streams.facebook.Post;
-import org.apache.streams.facebook.serializer.FacebookActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Tests serialization of Facebook Post inputs
- */
-public class FacebookPostSerDeIT {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(FacebookPostSerDeIT.class);
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    @Test
-    public void Tests()
-    {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
-        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
-
-        InputStream is = 
FacebookPostSerDeIT.class.getResourceAsStream("/testpost.json");
-        Joiner joiner = Joiner.on(" ").skipNulls();
-        is = new BoundedInputStream(is, 10000);
-        String json;
-
-        try {
-            json = joiner.join(IOUtils.readLines(is));
-            LOGGER.debug(json);
-
-            Post ser = mapper.readValue(json, Post.class);
-
-            String de = mapper.writeValueAsString(ser);
-
-            LOGGER.debug(de);
-
-            Post serde = mapper.readValue(de, Post.class);
-
-            Assert.assertEquals(ser, serde);
-
-            LOGGER.debug(mapper.writeValueAsString(serde));
-
-            Activity activity = new Activity();
-            FacebookActivityUtil.updateActivity(ser, activity);
-
-            assertNotNull(activity);
-            assertNotNull(activity.getActor().getId());
-            assertNotNull(activity.getActor().getDisplayName());
-            assertNotNull(activity.getId());
-            assert(activity.getVerb().equals("post"));
-            assertNotNull(activity.getObject());
-            assertNotNull(activity.getUpdated());
-            assertNotNull(activity.getPublished());
-            assertEquals(activity.getProvider().getId(), 
"id:providers:facebook");
-            assertEquals(activity.getProvider().getDisplayName(), "Facebook");
-            assertEquals(activity.getLinks().size(), 1);
-            
assertNotNull(activity.getAdditionalProperties().get("extensions"));
-
-        } catch( Exception e ) {
-            LOGGER.error("Exception: ", e);
-            Assert.fail();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/TestPage.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/TestPage.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/TestPage.java
new file mode 100644
index 0000000..d3a00b6
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/TestPage.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.facebook.test;
+
+import facebook4j.Cover;
+import facebook4j.Page;
+import facebook4j.Place;
+
+import java.net.URL;
+import java.util.Date;
+
+public class TestPage implements Page {
+    private String id;
+    private String name;
+
+    public TestPage(String id, String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public URL getLink() {
+        return null;
+    }
+
+    @Override
+    public String getCategory() {
+        return null;
+    }
+
+    @Override
+    public Boolean isPublished() {
+        return null;
+    }
+
+    @Override
+    public Boolean canPost() {
+        return null;
+    }
+
+    @Override
+    public Integer getLikes() {
+        return null;
+    }
+
+    @Override
+    public Place.Location getLocation() {
+        return null;
+    }
+
+    @Override
+    public String getPhone() {
+        return null;
+    }
+
+    @Override
+    public Integer getCheckins() {
+        return null;
+    }
+
+    @Override
+    public URL getPicture() {
+        return null;
+    }
+
+    @Override
+    public Cover getCover() {
+        return null;
+    }
+
+    @Override
+    public String getWebsite() {
+        return null;
+    }
+
+    @Override
+    public String getCompanyOverview() {
+        return null;
+    }
+
+    @Override
+    public Integer getTalkingAboutCount() {
+        return null;
+    }
+
+    @Override
+    public String getAccessToken() {
+        return null;
+    }
+
+    @Override
+    public Boolean isCommunityPage() {
+        return null;
+    }
+
+    @Override
+    public Integer getWereHereCount() {
+        return null;
+    }
+
+    @Override
+    public Integer getFanCount() {
+        return null;
+    }
+
+    @Override
+    public Date getCreatedTime() {
+        return null;
+    }
+
+    @Override
+    public String getAbout() {
+        return null;
+    }
+
+    @Override
+    public String getUsername() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivityActorSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivityActorSerDeIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivityActorSerDeIT.java
new file mode 100644
index 0000000..fff060a
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivityActorSerDeIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.test.data;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.streams.facebook.Page;
+import org.apache.streams.facebook.api.FacebookPageActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+
+/**
+ * Tests conversion of Facebook Page inputs to Actor
+ */
+public class FacebookActivityActorSerDeIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(FacebookActivityActorSerDeIT.class);
+    private FacebookPageActivitySerializer serializer = new 
FacebookPageActivitySerializer();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    @Test
+    public void Tests() throws Exception
+    {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
+        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+
+        InputStream is = 
FacebookActivityActorSerDeIT.class.getResourceAsStream("/testpage.json");
+        Joiner joiner = Joiner.on(" ").skipNulls();
+        is = new BoundedInputStream(is, 10000);
+        String json;
+
+        json = joiner.join(IOUtils.readLines(is));
+        LOGGER.debug(json);
+
+        Page page = mapper.readValue(json, Page.class);
+
+        Activity activity = serializer.deserialize(page);
+
+        LOGGER.debug(mapper.writeValueAsString(activity));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivitySerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivitySerDeIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivitySerDeIT.java
new file mode 100644
index 0000000..3d9e3e1
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookActivitySerDeIT.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.test.data;
+
+import org.apache.streams.facebook.api.FacebookPostActivitySerializer;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.streams.facebook.Post;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+
+/**
+ * Tests conversion of Facebook Post inputs to Activity
+ */
+public class FacebookActivitySerDeIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(FacebookActivitySerDeIT.class);
+    private FacebookPostActivitySerializer serializer = new 
FacebookPostActivitySerializer();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    @Test
+    public void Tests()
+    {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
+        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+
+        InputStream is = 
FacebookActivitySerDeIT.class.getResourceAsStream("/testpost.json");
+        Joiner joiner = Joiner.on(" ").skipNulls();
+        is = new BoundedInputStream(is, 10000);
+        String json;
+
+        try {
+            json = joiner.join(IOUtils.readLines(is));
+            LOGGER.debug(json);
+
+            Post post = mapper.readValue(json, Post.class);
+
+            Activity activity = serializer.deserialize(post);
+
+            LOGGER.debug(mapper.writeValueAsString(activity));
+
+        } catch( Exception e ) {
+            LOGGER.error("Exception: ", e);
+            Assert.fail();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPageSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPageSerDeIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPageSerDeIT.java
new file mode 100644
index 0000000..3499a67
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPageSerDeIT.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.test.data;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.streams.facebook.Page;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+
+/**
+ * Tests serialization of Facebook Page inputs
+ */
+public class FacebookPageSerDeIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(FacebookPageSerDeIT.class);
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    @Test
+    public void Tests()
+    {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
+        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+
+        InputStream is = 
FacebookPageSerDeIT.class.getResourceAsStream("/testpage.json");
+        Joiner joiner = Joiner.on(" ").skipNulls();
+        is = new BoundedInputStream(is, 10000);
+        String json;
+
+        try {
+            json = joiner.join(IOUtils.readLines(is));
+            LOGGER.debug(json);
+
+            Page ser = mapper.readValue(json, Page.class);
+
+            String de = mapper.writeValueAsString(ser);
+
+            LOGGER.debug(de);
+
+            Page serde = mapper.readValue(de, Page.class);
+
+            Assert.assertEquals(ser, serde);
+
+            LOGGER.debug(mapper.writeValueAsString(serde));
+
+        } catch( Exception e ) {
+            LOGGER.error("Exception: ", e);
+            Assert.fail();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
new file mode 100644
index 0000000..f877d56
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.test.data;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.streams.facebook.Post;
+import org.apache.streams.facebook.serializer.FacebookActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests serialization of Facebook Post inputs
+ */
+public class FacebookPostSerDeIT {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(FacebookPostSerDeIT.class);
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    @Test
+    public void Tests()
+    {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
+        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+
+        InputStream is = 
FacebookPostSerDeIT.class.getResourceAsStream("/testpost.json");
+        Joiner joiner = Joiner.on(" ").skipNulls();
+        is = new BoundedInputStream(is, 10000);
+        String json;
+
+        try {
+            json = joiner.join(IOUtils.readLines(is));
+            LOGGER.debug(json);
+
+            Post ser = mapper.readValue(json, Post.class);
+
+            String de = mapper.writeValueAsString(ser);
+
+            LOGGER.debug(de);
+
+            Post serde = mapper.readValue(de, Post.class);
+
+            Assert.assertEquals(ser, serde);
+
+            LOGGER.debug(mapper.writeValueAsString(serde));
+
+            Activity activity = new Activity();
+            FacebookActivityUtil.updateActivity(ser, activity);
+
+            assertNotNull(activity);
+            assertNotNull(activity.getActor().getId());
+            assertNotNull(activity.getActor().getDisplayName());
+            assertNotNull(activity.getId());
+            assert(activity.getVerb().equals("post"));
+            assertNotNull(activity.getObject());
+            assertNotNull(activity.getUpdated());
+            assertNotNull(activity.getPublished());
+            assertEquals(activity.getProvider().getId(), 
"id:providers:facebook");
+            assertEquals(activity.getProvider().getDisplayName(), "Facebook");
+            assertEquals(activity.getLinks().size(), 1);
+            assertNotNull(activity.getAdditionalProperties().get("facebook"));
+
+        } catch( Exception e ) {
+            LOGGER.error("Exception: ", e);
+            Assert.fail();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/TestFacebookProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/TestFacebookProvider.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/TestFacebookProvider.java
new file mode 100644
index 0000000..8416186
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/TestFacebookProvider.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.facebook.test.providers;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.facebook.FacebookConfiguration;
+import org.apache.streams.facebook.IdConfig;
+import org.apache.streams.facebook.provider.FacebookDataCollector;
+import org.apache.streams.facebook.provider.FacebookProvider;
+import org.junit.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit Tests For {@link org.apache.streams.facebook.provider.FacebookProvider}
+ */
+public class TestFacebookProvider {
+
+    @Test
+    public void testFacebookProvider() throws Exception {
+        //Test that streams starts and shut downs.
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        FacebookProvider provider = new FacebookProvider(new 
FacebookConfiguration()) {
+            @Override
+            protected FacebookDataCollector getDataCollector() {
+                return new TestFacebookDataCollector(barrier, 
super.configuration, super.datums);
+            }
+        };
+        provider.prepare(null);
+        provider.startStream();
+        assertTrue(provider.isRunning());
+        barrier.await();
+        assertTrue(provider.isRunning());
+        assertEquals(5, provider.readCurrent().size());
+        barrier.await();
+        assertEquals(0, provider.readCurrent().size());
+        assertFalse(provider.isRunning());
+        provider.cleanUp();
+    }
+
+    private class TestFacebookDataCollector extends FacebookDataCollector {
+
+        private CyclicBarrier barrier;
+        private BlockingQueue<StreamsDatum> queue;
+
+        public TestFacebookDataCollector(CyclicBarrier barrier, 
FacebookConfiguration config, BlockingQueue<StreamsDatum> queue) {
+            super(config, queue);
+            this.barrier = barrier;
+            this.queue = queue;
+
+        }
+
+        @Override
+        protected void getData(IdConfig id) throws Exception {
+
+        }
+
+        @Override
+        public void run() {
+            try {
+                for(int i=0; i < 5; ++i) {
+                    super.outputData(new Integer(i), ""+i);
+                }
+                this.barrier.await();
+                super.isComplete.set(true);
+                this.barrier.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (BrokenBarrierException bbe) {
+                fail();
+            }
+        }
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/page/FacebookPageProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/page/FacebookPageProviderIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/page/FacebookPageProviderIT.java
new file mode 100644
index 0000000..dea1584
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/page/FacebookPageProviderIT.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.test.providers.page;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.facebook.provider.page.FacebookPageProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class FacebookPageProviderIT {
+
+    @Test
+    public void testFacebookPageProvider() throws Exception {
+
+        String configfile = 
"./target/test-classes/FacebookPageProviderIT.conf";
+        String outfile = 
"./target/test-classes/FacebookPageProviderIT.stdout.txt";
+
+        FacebookPageProvider.main(Lists.newArrayList(configfile, 
outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() >= 1);
+
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java
new file mode 100644
index 0000000..091fc6c
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.test.providers.pagefeed;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.facebook.provider.pagefeed.FacebookPageFeedProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class FacebookPageFeedProviderIT {
+
+    @Test
+    public void testFacebookPageFeedProvider() throws Exception {
+
+        String configfile = 
"./target/test-classes/FacebookPageFeedProviderIT.conf";
+        String outfile = 
"./target/test-classes/FacebookPageFeedProviderIT.stdout.txt";
+
+        FacebookPageFeedProvider.main(Lists.newArrayList(configfile, 
outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() >= 1000);
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/resources/FacebookPageFeedProviderIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/resources/FacebookPageFeedProviderIT.conf
 
b/streams-contrib/streams-provider-facebook/src/test/resources/FacebookPageFeedProviderIT.conf
new file mode 100644
index 0000000..5b7b32e
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/test/resources/FacebookPageFeedProviderIT.conf
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+facebook {
+  ids = [
+    {
+      #"id": "facebook"
+      "id": "20531316728"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/112fba08/streams-contrib/streams-provider-facebook/src/test/resources/FacebookPageProviderIT.conf
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/resources/FacebookPageProviderIT.conf
 
b/streams-contrib/streams-provider-facebook/src/test/resources/FacebookPageProviderIT.conf
new file mode 100644
index 0000000..5b7b32e
--- /dev/null
+++ 
b/streams-contrib/streams-provider-facebook/src/test/resources/FacebookPageProviderIT.conf
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+facebook {
+  ids = [
+    {
+      #"id": "facebook"
+      "id": "20531316728"
+    }
+  ]
+}
\ No newline at end of file

Reply via email to