STREAMS-496: Remove twitter4j dependency from streams-provider-twitter

resolves #360
commit 05aa4ddc907c845fdbbfc9e8feb85cc0f9ecc2fb
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Mon Apr 17 17:04:24 2017 -0500

    upgrade hosebird client because why not

commit 3dc5f515fc067fbd50af431139b98c3bc408cff8
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Wed Apr 12 23:48:21 2017 -0500

    PR feedback, juneau target version bump

commit b67487a54f7e2b6676ab031f8ff0091393111f56
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Wed Apr 12 22:48:58 2017 -0500

    PR feedback - javadoc accuracy

commit 996c1b47e9dc485692dbc5a4c18f5754c697cd91
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Wed Apr 12 22:33:27 2017 -0500

    clean up

commit 8618abcad4e6087c0270ba8c54a201888071e0e8
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Tue Apr 11 16:45:11 2017 -0500

    retry logic

commit d7cd07110d602d24a07aad8e8f390ff0d98f1e19
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Mon Apr 3 15:11:32 2017 -0500

    checkstyle clean up

commit 069be4da745b9cd262159cca1982a9b52cb5cefc
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Mon Apr 3 14:56:01 2017 -0500

    all integration tests passing using juneau 6.1.0-incubating

commit 517b85f02e93c51d5fa36a704f7f0e0e2cdaed4e
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Mon Apr 3 11:01:25 2017 -0500

    oauth signing working

commit 06845f2dd848c91971c10f194f0d5503d5f4a964
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Tue Mar 28 13:20:03 2017 -0500

    return empty array not null on error

commit eddf003ba5b5555b10c3d51267c00225b6da4ef1
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Sun Mar 26 22:46:19 2017 -0500

    TwitterOAuthRequestInterceptorTest.testProcess

    TwitterOAuthRequestInterceptorTest.testProcess is pretty much directly off 
the twitter dev guidelines and tests the entire auth process at once.  passes. 
ITs still fail

commit c623f7139421049d5b536576c739b60e959cf11e
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Sun Mar 26 17:55:09 2017 -0500

    STREAMS-496: going through guide word-by-word and debugging line-by-line 
looking for mistakes

commit 5a07c57b553f8c87af1d42e0bbef4f0795f17bf7
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Sun Mar 26 17:44:37 2017 -0500

    STREAMS-496: more unit tests, all passing, ITs still fail

commit edc2662fd68b9e1e04c20b8721a44a6ee9fb36e6
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Sun Mar 26 16:20:10 2017 -0500

    adding unit testing to debug authentication

commit 1885090c6d683c8a9a6df7c64f933529378ee965
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Sun Mar 26 13:09:24 2017 -0500

    cleanup authentication.  debugging ‘Bad Authentication data.’

commit 434657439abc0dde2998a8cb213d8e8398f43ad6
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Sun Mar 26 12:56:34 2017 -0500

    first implementation of oauth signing

commit 0d172e76232e3d0bd48af8d6616716c444eb94f0
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Sun Mar 26 12:56:11 2017 -0500

    STREAMS-496: consolidate httpcomponents version selection

commit 78f30284caefb09e06818a9f435c3f380ee986a3
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Sat Mar 25 11:38:06 2017 -0500

    STREAMS-496: WIP - twitter4j gone, src/main/java compiles

commit c95bccd41b4f5f51d7e614e5927210458d1dbfbd
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Fri Mar 24 14:21:34 2017 -0500

    implementations using RestClient

commit 68d7976f928173dbd6de636c58d1a23dbe95eb15
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Fri Mar 24 13:39:52 2017 -0500

    STREAMS-496: additional pojos and interfaces needed to maintain features

commit a8fd796096296bb2e8af61b722c4a261acbba987
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Mon Mar 20 13:07:27 2017 -0500

    more WIP

commit 7dedbb42251e1c5214690e32f12a5efb5032dc92
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Date:   Mon Mar 20 01:35:33 2017 -0500

    STREAMS-496: WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/67497a48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/67497a48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/67497a48

Branch: refs/heads/master
Commit: 67497a488a9417dcdbf702f707729928500c8d5c
Parents: 19caf33
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Authored: Wed Apr 19 14:49:38 2017 -0500
Committer: Steve Blackmon @steveblackmon <sblack...@apache.org>
Committed: Wed Apr 19 14:49:38 2017 -0500

----------------------------------------------------------------------
 pom.xml                                         |  14 +-
 streams-components/streams-http/pom.xml         |   1 -
 streams-contrib/streams-persist-riak/pom.xml    |   3 -
 .../streams-provider-twitter/pom.xml            |  25 +-
 .../apache/streams/twitter/api/Followers.java   |  52 ++
 .../org/apache/streams/twitter/api/Friends.java |  52 ++
 .../apache/streams/twitter/api/Statuses.java    |  62 +++
 .../org/apache/streams/twitter/api/Twitter.java | 521 +++++++++++++++++++
 .../api/TwitterOAuthRequestInterceptor.java     | 236 +++++++++
 .../twitter/api/TwitterRetryHandler.java        | 162 ++++++
 .../org/apache/streams/twitter/api/Users.java   |  51 ++
 .../converter/util/TwitterActivityUtil.java     |   6 +-
 .../FetchAndReplaceTwitterProcessor.java        | 107 ++--
 .../twitter/provider/TwitterErrorHandler.java   | 133 -----
 .../TwitterFollowersIdsProviderTask.java        | 124 +++++
 .../TwitterFollowersListProviderTask.java       | 120 +++++
 .../provider/TwitterFollowingProvider.java      | 188 +++++--
 .../provider/TwitterFollowingProviderTask.java  | 266 ----------
 .../provider/TwitterFriendsIdsProviderTask.java | 125 +++++
 .../TwitterFriendsListProviderTask.java         | 119 +++++
 .../provider/TwitterTimelineProvider.java       | 157 +++---
 .../provider/TwitterTimelineProviderTask.java   |  81 ++-
 .../TwitterUserInformationProvider.java         | 256 ++++-----
 .../TwitterUserInformationProviderTask.java     |  77 +++
 .../streams/twitter/TwitterConfiguration.json   |  87 ----
 .../twitter/TwitterFollowingConfiguration.json  |  33 --
 .../twitter/TwitterStreamConfiguration.json     |  45 --
 .../TwitterTimelineProviderConfiguration.json   |  23 -
 .../TwitterUserInformationConfiguration.json    |  25 -
 .../twitter/api/FollowersIdsRequest.json        |  13 +
 .../twitter/api/FollowersIdsResponse.json       |  32 ++
 .../twitter/api/FollowersListRequest.json       |  13 +
 .../twitter/api/FollowersListResponse.json      |  33 ++
 .../twitter/api/FollowingIdsRequest.json        |  36 ++
 .../twitter/api/FollowingListRequest.json       |  41 ++
 .../streams/twitter/api/FriendsIdsRequest.json  |  13 +
 .../streams/twitter/api/FriendsIdsResponse.json |  32 ++
 .../streams/twitter/api/FriendsListRequest.json |  13 +
 .../twitter/api/FriendsListResponse.json        |  33 ++
 .../twitter/api/StatusesLookupRequest.json      |  35 ++
 .../twitter/api/StatusesShowRequest.json        |  32 ++
 .../api/StatusesUserTimelineRequest.json        |  57 ++
 .../streams/twitter/api/UsersLookupRequest.json |  33 ++
 .../streams/twitter/api/UsersShowRequest.json   |  27 +
 .../twitter/config/TwitterConfiguration.json    |  92 ++++
 .../config/TwitterFollowingConfiguration.json   |  33 ++
 .../config/TwitterStreamConfiguration.json      |  45 ++
 .../TwitterTimelineProviderConfiguration.json   |  23 +
 .../TwitterUserInformationConfiguration.json    |  25 +
 .../api/TwitterOAuthRequestInterceptorTest.java | 124 +++++
 .../providers/TwitterTimelineProviderIT.java    |   2 +-
 streams-monitoring/pom.xml                      |   5 +-
 52 files changed, 2911 insertions(+), 1032 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e83dab2..d779fa9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -359,8 +359,8 @@
         <facebook4j.version>2.4.7</facebook4j.version>
         <mockito.version>1.10.19</mockito.version>
         <powermock.version>1.6.5</powermock.version>
-        <httpcomponents.core.version>4.4.5</httpcomponents.core.version>
-        <httpcomponents.client.version>4.5.2</httpcomponents.client.version>
+        <httpcomponents.core.version>4.4.6</httpcomponents.core.version>
+        <httpcomponents.client.version>4.5.3</httpcomponents.client.version>
         <doxia.version>1.7</doxia.version>
 
         <!-- osgi configuration -->
@@ -978,6 +978,16 @@
             <version>${commons-lang3.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>${httpcomponents.core.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpcomponents.client.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.typesafe</groupId>
             <artifactId>config</artifactId>
             <version>${typesafe.config.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-components/streams-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/pom.xml 
b/streams-components/streams-http/pom.xml
index 4d97bb6..603c02e 100644
--- a/streams-components/streams-http/pom.xml
+++ b/streams-components/streams-http/pom.xml
@@ -62,7 +62,6 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
-            <version>${httpcomponents.client.version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-persist-riak/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/pom.xml 
b/streams-contrib/streams-persist-riak/pom.xml
index 3b24fc2..f96c2c3 100644
--- a/streams-contrib/streams-persist-riak/pom.xml
+++ b/streams-contrib/streams-persist-riak/pom.xml
@@ -30,8 +30,6 @@
     <description>Riak Module</description>
 
     <properties>
-        <httpcomponents.core.version>4.3.5</httpcomponents.core.version>
-        <httpcomponents.client.version>4.3.5</httpcomponents.client.version>
         <riak.version>2.0.6</riak.version>
     </properties>
 
@@ -58,7 +56,6 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
-            <version>${httpcomponents.client.version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml 
b/streams-contrib/streams-provider-twitter/pom.xml
index 91d2f88..6a0778d 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -90,22 +90,31 @@
         <dependency>
             <groupId>com.twitter</groupId>
             <artifactId>hbc-core</artifactId>
-            <version>2.1.0</version>
+            <version>2.2.0</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.twitter4j</groupId>
-            <artifactId>twitter4j-core</artifactId>
-            <version>4.0.3</version>
+            <groupId>org.apache.juneau</groupId>
+            <artifactId>juneau-rest-client</artifactId>
+            <version>6.1.0-incubating</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
             <version>1.3</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
@@ -114,6 +123,12 @@
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java
new file mode 100644
index 0000000..5e04e6e
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.twitter.pojo.Tweet;
+
+import java.util.List;
+
+/**
+ * Returns a collection of the most recent Tweets posted by the user indicated 
by the screen_name or user_id parameters.
+ *
+ * @see <a 
href="https://dev.twitter.com/rest/reference/get/followers/ids";>https://api.twitter.com/1.1/statuses/user_timeline.json</a>
+ */
+public interface Followers {
+
+  /**
+   * Returns a cursored collection of user IDs for every user following the 
specified user.
+   *
+   * @param parameters {@link 
org.apache.streams.twitter.api.FollowersIdsRequest}
+   * @return List < Tweet >
+   * @see <a 
href="https://dev.twitter.com/rest/reference/get/followers/ids";>https://dev.twitter.com/rest/reference/get/followers/ids</a>
+   *
+   */
+  public FollowersIdsResponse ids(FollowersIdsRequest parameters);
+
+  /**
+   * Returns a cursored collection of user objects for users following the 
specified user.
+   *
+   * @param parameters {@link 
org.apache.streams.twitter.api.FollowersListRequest}
+   * @return List < Tweet >
+   * @see <a 
href="https://dev.twitter.com/rest/reference/get/followers/list";>https://dev.twitter.com/rest/reference/get/followers/list</a>
+   *
+   */
+  public FollowersListResponse list(FollowersListRequest parameters);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java
new file mode 100644
index 0000000..d0d1f72
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.twitter.pojo.Tweet;
+
+import java.util.List;
+
+/**
+ * Returns a collection of the most recent Tweets posted by the user indicated 
by the screen_name or user_id parameters.
+ *
+ * @see <a 
href="https://dev.twitter.com/rest/reference/get/statuses/user_timeline";>https://api.twitter.com/1.1/statuses/user_timeline.json</a>
+ */
+public interface Friends {
+
+  /**
+   * Returns a cursored collection of user IDs for every user the specified 
user is following.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.FriendsIdsRequest}
+   * @return List<Tweet>
+   * @see <a 
href="https://dev.twitter.com/rest/reference/get/friends/ids";>https://dev.twitter.com/rest/reference/get/friends/ids</a>
+   *
+   */
+  public FriendsIdsResponse ids(FriendsIdsRequest parameters);
+
+  /**
+   * Returns a cursored collection of user objects for every user the 
specified user is following.
+   *
+   * @param parameters {@link 
org.apache.streams.twitter.api.FriendsListRequest}
+   * @return List<Tweet>
+   * @see <a 
href="https://dev.twitter.com/rest/reference/get/friends/list";>https://dev.twitter.com/rest/reference/get/friends/list</a>
+   *
+   */
+  public FriendsListResponse list(FriendsListRequest parameters);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java
new file mode 100644
index 0000000..c9945b0
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.twitter.pojo.Tweet;
+
+import java.util.List;
+
+/**
+ * Returns a collection of the most recent Tweets posted by the user indicated 
by the screen_name or user_id parameters.
+ *
+ * @see <a 
href="https://dev.twitter.com/rest/reference/get/statuses/user_timeline";>https://api.twitter.com/1.1/statuses/user_timeline.json</a>
+ */
+public interface Statuses {
+
+  /**
+   * Returns fully-hydrated Tweet objects for up to 100 Tweets per request, as 
specified by comma-separated values passed to the id parameter.
+   *
+   * @param parameters {@link 
org.apache.streams.twitter.api.StatusesLookupRequest}
+   * @return List<Tweet>
+   * @see <a 
href="https://dev.twitter.com/rest/reference/get/statuses/lookup";>https://dev.twitter.com/rest/reference/get/statuses/lookup</a>
+   *
+   */
+  public List<Tweet> lookup(StatusesLookupRequest parameters);
+
+  /**
+   * Returns a single Tweet, specified by the id parameter. The Tweet’s 
author will also be embedded within the Tweet.
+   *
+   * @param parameters {@link 
org.apache.streams.twitter.api.StatusesShowRequest}
+   * @return List<Tweet>
+   * @see <a 
href="https://dev.twitter.com/rest/reference/get/statuses/show/id";>https://dev.twitter.com/rest/reference/get/statuses/show/id</a>
+   *
+   */
+  public Tweet show(StatusesShowRequest parameters);
+
+  /**
+   * Returns a collection of the most recent Tweets posted by the user 
indicated by the screen_name or user_id parameters.
+   *
+   * @param parameters {@link 
org.apache.streams.twitter.api.StatusesUserTimelineRequest}
+   * @return List<Tweet>
+   * @see <a 
href="https://dev.twitter.com/rest/reference/get/statuses/user_timeline";>https://dev.twitter.com/rest/reference/get/statuses/user_timeline</a>
+   *
+   */
+  public List<Tweet> userTimeline(StatusesUserTimelineRequest parameters);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
new file mode 100644
index 0000000..f4570d6
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.TwitterConfiguration;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.provider.TwitterProviderUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.client.HttpRequestRetryHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.params.ClientPNames;
+import org.apache.http.client.params.CookiePolicy;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.juneau.json.JsonParser;
+import org.apache.juneau.parser.ParseException;
+import org.apache.juneau.plaintext.PlainTextSerializer;
+import org.apache.juneau.rest.client.RestCall;
+import org.apache.juneau.rest.client.RestCallException;
+import org.apache.juneau.rest.client.RestClient;
+//import org.apache.juneau.rest.client.RestClientBuilder;
+import org.apache.juneau.rest.client.RetryOn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of all twitter interfaces using juneau.
+ */
+public class Twitter implements Followers, Friends, Statuses, Users {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Twitter.class);
+
+  private static Map<TwitterConfiguration, Twitter> INSTANCE_MAP = new 
ConcurrentHashMap<>();
+
+  private TwitterConfiguration configuration;
+
+  private ObjectMapper mapper;
+
+  private String rootUrl;
+
+  private CloseableHttpClient httpclient;
+
+  private HttpRequestInterceptor oauthInterceptor;
+
+  RestClient restClient;
+
+  private Twitter(TwitterConfiguration configuration) throws 
InstantiationException {
+    this.configuration = configuration;
+    this.rootUrl = TwitterProviderUtil.baseUrl(configuration);
+    this.oauthInterceptor = new 
TwitterOAuthRequestInterceptor(configuration.getOauth());
+    this.httpclient = HttpClientBuilder.create()
+        .addInterceptorFirst(oauthInterceptor)
+        .setDefaultRequestConfig(RequestConfig.custom()
+          .setConnectionRequestTimeout(5000)
+          .setConnectTimeout(5000)
+          .setSocketTimeout(5000)
+          .setCookieSpec("easy")
+          .build()
+        )
+        .setMaxConnPerRoute(20)
+        .setMaxConnTotal(100)
+        .build();
+
+//  TODO: juneau-6.3.x-incubating
+//  this.restClient = new RestClientBuilder()
+//        .httpClient(httpclient, true)
+//        .parser(JsonParser.class)
+//        .rootUrl(rootUrl)
+//        .retryable(
+//            configuration.getRetryMax().intValue(),
+//            configuration.getRetrySleepMs(),
+//            new TwitterRetryHandler())
+//        .build();
+    this.restClient = new RestClient()
+        .setHttpClient(httpclient)
+        .setParser(JsonParser.class)
+        .setRootUrl(rootUrl);
+
+    this.mapper = StreamsJacksonMapper.getInstance();
+  }
+
+  public static Twitter getInstance(TwitterConfiguration configuration) throws 
InstantiationException {
+    if (INSTANCE_MAP.containsKey(configuration) && 
INSTANCE_MAP.get(configuration) != null) {
+      return INSTANCE_MAP.get(configuration);
+    } else {
+      Twitter twitter = new Twitter(configuration);
+      INSTANCE_MAP.put(configuration, twitter);
+      return INSTANCE_MAP.get(configuration);
+    }
+  }
+
+  @Override
+  public List<Tweet> userTimeline(StatusesUserTimelineRequest parameters) {
+    try {
+//  TODO: juneau-6.3.x-incubating
+//      Statuses restStatuses = 
restClient.getRemoteableProxy("/statuses/user_timeline.json", Statuses.class);
+//      List<Tweet> result = restStatuses.userTimeline(parameters);
+//      return result;
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/statuses/user_timeline.json");
+      if( StringUtils.isNotBlank(parameters.getUserId().toString())) {
+        uriBuilder.addParameter("user_id", parameters.getUserId().toString());
+      }
+      if( StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if( Objects.nonNull(parameters.getSinceId()) && 
StringUtils.isNotBlank(parameters.getSinceId().toString())) {
+        uriBuilder.addParameter("since_id", 
parameters.getSinceId().toString());
+      }
+      if( Objects.nonNull(parameters.getCount()) && 
StringUtils.isNotBlank(parameters.getCount().toString())) {
+        uriBuilder.addParameter("count", parameters.getCount().toString());
+      }
+      if( Objects.nonNull(parameters.getMaxId()) && 
StringUtils.isNotBlank(parameters.getMaxId().toString())) {
+        uriBuilder.addParameter("max_id", parameters.getMaxId().toString());
+      }
+      if( Objects.nonNull(parameters.getTrimUser()) && 
StringUtils.isNotBlank(parameters.getTrimUser().toString())) {
+        uriBuilder.addParameter("trim_user", 
parameters.getTrimUser().toString());
+      }
+      if( Objects.nonNull(parameters.getExcludeReplies()) && 
StringUtils.isNotBlank(parameters.getExcludeReplies().toString())) {
+        uriBuilder.addParameter("exclude_replies", 
parameters.getExcludeReplies().toString());
+      }
+      if( Objects.nonNull(parameters.getContributorDetails()) && 
StringUtils.isNotBlank(parameters.getContributorDetails().toString())) {
+        uriBuilder.addParameter("contributor_details", 
parameters.getContributorDetails().toString());
+      }
+      if( Objects.nonNull(parameters.getIncludeRts()) && 
StringUtils.isNotBlank(parameters.getIncludeRts().toString())) {
+        uriBuilder.addParameter("include_rts", 
parameters.getIncludeRts().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), 
configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        ArrayNode resultArrayNode = mapper.readValue(restResponseEntity, 
ArrayNode.class);
+        List<Tweet> result = new ArrayList();
+        resultArrayNode.iterator().forEachRemaining(item -> 
result.add(mapper.convertValue(item, Tweet.class)));
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return new ArrayList<>();
+  }
+
+  @Override
+  public List<Tweet> lookup(StatusesLookupRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Statuses restStatuses = 
restClient.getRemoteableProxy("/statuses/lookup.json", Statuses.class);
+//      List<Tweet> result = restStatuses.lookup(parameters);
+//      return result;
+    String ids = StringUtils.join(parameters.getId(), ',');
+    try {
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/statuses/lookup.json");
+      if( Objects.nonNull(parameters.getId()) && 
StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if( Objects.nonNull(parameters.getTrimUser()) && 
StringUtils.isNotBlank(parameters.getTrimUser().toString())) {
+        uriBuilder.addParameter("trim_user", 
parameters.getTrimUser().toString());
+      }
+      if( Objects.nonNull(parameters.getIncludeEntities()) && 
StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) {
+        uriBuilder.addParameter("include_entities", 
parameters.getIncludeEntities().toString());
+      }
+      if( Objects.nonNull(parameters.getMap()) && 
StringUtils.isNotBlank(parameters.getMap().toString())) {
+        uriBuilder.addParameter("map", parameters.getMap().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), 
configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        ArrayNode resultArrayNode = mapper.readValue(restResponseEntity, 
ArrayNode.class);
+        List<Tweet> result = new ArrayList();
+        resultArrayNode.iterator().forEachRemaining(item -> 
result.add(mapper.convertValue(item, Tweet.class)));
+        //List<Tweet> result = restCall.getResponse(LinkedList.class, 
Tweet.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return new ArrayList<>();
+  }
+
+  @Override
+  public Tweet show(StatusesShowRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Statuses restStatuses = 
restClient.getRemoteableProxy("/statuses/show.json", Statuses.class);
+//      Tweet result = restStatuses.show(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/statuses/show.json");
+      if (Objects.nonNull(parameters.getId()) && 
StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if (Objects.nonNull(parameters.getTrimUser()) && 
StringUtils.isNotBlank(parameters.getTrimUser().toString())) {
+        uriBuilder.addParameter("trim_user", 
parameters.getTrimUser().toString());
+      }
+      if (Objects.nonNull(parameters.getIncludeEntities()) && 
StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) {
+        uriBuilder.addParameter("include_entities", 
parameters.getIncludeEntities().toString());
+      }
+      if (Objects.nonNull(parameters.getIncludeMyRetweet()) && 
StringUtils.isNotBlank(parameters.getIncludeMyRetweet().toString())) {
+        uriBuilder.addParameter("include_my_retweet", 
parameters.getIncludeMyRetweet().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), 
configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        //Tweet result = restCall.getResponse(Tweet.class);
+        Tweet result = mapper.readValue(restResponseEntity, Tweet.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+  @Override
+  public FriendsIdsResponse ids(FriendsIdsRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Friends restFriends = 
restClient.getRemoteableProxy("/friends/ids.json", Friends.class);
+//      FriendsIdsResponse result = restFriends.ids(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/friends/ids.json");
+      if( Objects.nonNull(parameters.getCount()) && 
StringUtils.isNotBlank(parameters.getCount().toString())) {
+        uriBuilder.addParameter("count", parameters.getCount().toString());
+      }
+      if( Objects.nonNull(parameters.getCursor()) && 
StringUtils.isNotBlank(parameters.getCursor().toString())) {
+        uriBuilder.addParameter("cursor", parameters.getCursor().toString());
+      }
+      if( Objects.nonNull(parameters.getId()) && 
StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if( StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if( Objects.nonNull(parameters.getStringifyIds()) && 
StringUtils.isNotBlank(parameters.getStringifyIds().toString())) {
+        uriBuilder.addParameter("stringify_ids", 
parameters.getStringifyIds().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), 
configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        //FriendsIdsResponse result = 
restCall.getResponse(FriendsIdsResponse.class);
+        FriendsIdsResponse result = mapper.readValue(restResponseEntity, 
FriendsIdsResponse.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+  @Override
+  public FriendsListResponse list(FriendsListRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Friends restFriends = 
restClient.getRemoteableProxy("/friends/list.json", Friends.class);
+//      FriendsListResponse result = restFriends.list(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/friends/list.json");
+      if (Objects.nonNull(parameters.getCount()) && 
StringUtils.isNotBlank(parameters.getCount().toString())) {
+        uriBuilder.addParameter("count", parameters.getCount().toString());
+      }
+      if (Objects.nonNull(parameters.getCursor()) && 
StringUtils.isNotBlank(parameters.getCursor().toString())) {
+        uriBuilder.addParameter("cursor", parameters.getCursor().toString());
+      }
+      if (Objects.nonNull(parameters.getId()) && 
StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if (Objects.nonNull(parameters.getIncludeUserEntities()) && 
StringUtils.isNotBlank(parameters.getIncludeUserEntities().toString())) {
+        uriBuilder.addParameter("include_user_entities", 
parameters.getIncludeUserEntities().toString());
+      }
+      if (StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if (Objects.nonNull(parameters.getSkipStatus()) && 
StringUtils.isNotBlank(parameters.getSkipStatus().toString())) {
+        uriBuilder.addParameter("skip_status", 
parameters.getSkipStatus().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), 
configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        //FriendsListResponse result = 
restCall.getResponse(FriendsListResponse.class);
+        FriendsListResponse result = mapper.readValue(restResponseEntity, 
FriendsListResponse.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    }catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+  @Override
+  public FollowersIdsResponse ids(FollowersIdsRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Followers restFollowers = 
restClient.getRemoteableProxy("/friends/list.json", Followers.class);
+//      FollowersIdsResponse result = restFollowers.ids(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/followers/ids.json");
+      if (Objects.nonNull(parameters.getCount()) && 
StringUtils.isNotBlank(parameters.getCount().toString())) {
+        uriBuilder.addParameter("count", parameters.getCount().toString());
+      }
+      if (Objects.nonNull(parameters.getCursor()) && 
StringUtils.isNotBlank(parameters.getCursor().toString())) {
+        uriBuilder.addParameter("cursor", parameters.getCursor().toString());
+      }
+      if (Objects.nonNull(parameters.getId()) && 
StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if (StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if (Objects.nonNull(parameters.getStringifyIds()) && 
StringUtils.isNotBlank(parameters.getStringifyIds().toString())) {
+        uriBuilder.addParameter("stringify_ids", 
parameters.getStringifyIds().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        //FollowersIdsResponse result = 
restCall.getResponse(FollowersIdsResponse.class);
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), 
configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        FollowersIdsResponse result = mapper.readValue(restResponseEntity, 
FollowersIdsResponse.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+  @Override
+  public FollowersListResponse list(FollowersListRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Followers restFollowers = 
restClient.getRemoteableProxy("/friends/list.json", Followers.class);
+//      FollowersListResponse result = restFollowers.list(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder =
+          new URIBuilder()
+              .setPath("/followers/list.json");
+      if (Objects.nonNull(parameters.getCount()) && 
StringUtils.isNotBlank(parameters.getCount().toString())) {
+        uriBuilder.addParameter("count", parameters.getCount().toString());
+      }
+      if (Objects.nonNull(parameters.getCursor()) && 
StringUtils.isNotBlank(parameters.getCursor().toString())) {
+        uriBuilder.addParameter("cursor", parameters.getCursor().toString());
+      }
+      if (Objects.nonNull(parameters.getId()) && 
StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if (Objects.nonNull(parameters.getIncludeUserEntities()) && 
StringUtils.isNotBlank(parameters.getIncludeUserEntities().toString())) {
+        uriBuilder.addParameter("include_user_entities", 
parameters.getIncludeUserEntities().toString());
+      }
+      if (StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if (Objects.nonNull(parameters.getSkipStatus()) && 
StringUtils.isNotBlank(parameters.getSkipStatus().toString())) {
+        uriBuilder.addParameter("skip_status", 
parameters.getSkipStatus().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), 
configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        //FollowersListResponse result = 
restCall.getResponse(FollowersListResponse.class);
+        FollowersListResponse result = mapper.readValue(restResponseEntity, 
FollowersListResponse.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    }catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+  @Override
+  public List<User> lookup(UsersLookupRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Users restUsers = restClient.getRemoteableProxy("/users/lookup.json", 
Users.class);
+//      List<User> result = restUsers.lookup(parameters);
+//      return result;
+    String user_ids = StringUtils.join(parameters.getUserId(), ',');
+    String screen_names = StringUtils.join(parameters.getScreenName(), ',');
+    try {
+      URIBuilder uriBuilder =
+          new URIBuilder()
+              .setPath("/users/lookup.json");
+      if (Objects.nonNull(parameters.getIncludeEntities()) && 
StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) {
+        uriBuilder.addParameter("include_entities", 
parameters.getIncludeEntities().toString());
+      }
+      if (Objects.nonNull(screen_names) && 
StringUtils.isNotBlank(screen_names)) {
+        uriBuilder.addParameter("screen_name", screen_names);
+      }
+      if (Objects.nonNull(user_ids) && StringUtils.isNotBlank(user_ids)) {
+        uriBuilder.addParameter("user_id", user_ids);
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+//      List<User> result = restCall.getResponse(LinkedList.class, User.class);
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), 
configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        ArrayNode resultArrayNode = mapper.readValue(restResponseEntity, 
ArrayNode.class);
+        List<User> result = new ArrayList();
+        resultArrayNode.iterator().forEachRemaining(item -> 
result.add(mapper.convertValue(item, User.class)));
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return new ArrayList<>();
+  }
+
+  @Override
+  public User show(UsersShowRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Users restUsers = restClient.getRemoteableProxy("/users/lookup.json", 
Users.class);
+//      User result = restUsers.show(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder =
+          new URIBuilder()
+              .setPath("/users/show.json");
+      if (Objects.nonNull(parameters.getIncludeEntities()) && 
StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) {
+        uriBuilder.addParameter("include_entities", 
parameters.getIncludeEntities().toString());
+      }
+      if (Objects.nonNull(parameters.getScreenName()) && 
StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if (Objects.nonNull(parameters.getUserId()) && 
StringUtils.isNotBlank(parameters.getUserId().toString())) {
+        uriBuilder.addParameter("user_id", parameters.getUserId().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), 
configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        User result = mapper.readValue(restResponseEntity, User.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java
new file mode 100644
index 0000000..24c7b04
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.twitter.TwitterOAuthConfiguration;
+
+import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestWrapper;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.BASE64Encoder;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.security.GeneralSecurityException;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.StringJoiner;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.crypto.Mac;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+
+/**
+ * Handles request signing to api.twitter.com
+ */
+public class TwitterOAuthRequestInterceptor implements HttpRequestInterceptor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterOAuthRequestInterceptor.class);
+
+  private static final String oauth_signature_encoding = "UTF-8";
+  private static final String oauth_signature_method = "HMAC-SHA1";
+  private static final String oauth_version = "1.0";
+
+  private static final BASE64Encoder base64Encoder = new BASE64Encoder();
+
+  TwitterOAuthConfiguration oAuthConfiguration;
+
+  public TwitterOAuthRequestInterceptor(TwitterOAuthConfiguration 
oAuthConfiguration) {
+    this.oAuthConfiguration = oAuthConfiguration;
+  }
+
+  @Override
+  public void process(HttpRequest httpRequest, HttpContext httpContext) throws 
HttpException, IOException {
+
+    String oauth_nonce = generateNonce();
+
+    String oauth_timestamp = generateTimestamp();
+
+    Map<String,String> oauthParamMap = new HashMap<>();
+    oauthParamMap.put("oauth_consumer_key", 
oAuthConfiguration.getConsumerKey());
+    oauthParamMap.put("oauth_nonce", oauth_nonce);
+    oauthParamMap.put("oauth_signature_method", oauth_signature_method);
+    oauthParamMap.put("oauth_timestamp", oauth_timestamp);
+    oauthParamMap.put("oauth_token", oAuthConfiguration.getAccessToken());
+    oauthParamMap.put("oauth_version", oauth_version);
+
+    String request_host = 
((HttpRequestWrapper)httpRequest).getTarget().toString().replace(":443","");
+    String request_path = httpRequest.getRequestLine().getUri().substring(0, 
httpRequest.getRequestLine().getUri().indexOf('?'));
+    String request_param_line = 
httpRequest.getRequestLine().getUri().substring(httpRequest.getRequestLine().getUri().indexOf('?')+1);
+    String[] request_params = URLDecoder.decode(request_param_line).split("&");
+
+    Map<String,String> allParamsMap = new HashMap<>(oauthParamMap);
+
+    for( String request_param : request_params ) {
+      String key = request_param.substring(0, request_param.indexOf('='));
+      String value = request_param.substring(request_param.indexOf('=')+1, 
request_param.length());
+      allParamsMap.put(key, value);
+    }
+
+    String[] body_params;
+    if( ((HttpRequestWrapper) httpRequest).getOriginal() instanceof HttpPost) {
+      String body = EntityUtils.toString(((HttpPost)((HttpRequestWrapper) 
httpRequest).getOriginal()).getEntity());
+      body_params = body.split(",");
+      for( String body_param : body_params ) {
+        String key = body_param.substring(0, body_param.indexOf('='));
+        String value = body_param.substring(body_param.indexOf('=')+1, 
body_param.length());
+        allParamsMap.put(key, value);
+      }
+    }
+
+    allParamsMap = encodeMap(allParamsMap);
+
+    String signature_parameter_string = 
generateSignatureParameterString(allParamsMap);
+
+    String signature_base_string = 
generateSignatureBaseString(((HttpRequestWrapper) httpRequest).getMethod(), 
request_host+request_path, signature_parameter_string);
+
+    String signing_key = encode(oAuthConfiguration.getConsumerSecret()) + "&" 
+ encode(oAuthConfiguration.getAccessTokenSecret());
+
+    String oauth_signature;
+    try {
+      oauth_signature = computeSignature(signature_base_string, signing_key);
+    } catch (GeneralSecurityException e) {
+      LOGGER.warn("GeneralSecurityException", e);
+      return;
+    }
+
+    oauthParamMap.put("oauth_signature", oauth_signature);
+
+    String authorization_header_string = 
generateAuthorizationHeaderString(oauthParamMap);
+
+    httpRequest.setHeader("Authorization", authorization_header_string);
+
+  }
+
+  public String generateTimestamp() {
+    Calendar tempcal = Calendar.getInstance();
+    long ts = tempcal.getTimeInMillis();// get current time in milliseconds
+    String oauth_timestamp = (new Long(ts/1000)).toString();
+    return oauth_timestamp;
+  }
+
+  public String generateNonce() {
+    String uuid_string = UUID.randomUUID().toString();
+    uuid_string = uuid_string.replaceAll("-", "");
+    String oauth_nonce = base64Encoder.encode(uuid_string.getBytes());
+    return oauth_nonce;
+  }
+
+  public static Map<String, String> encodeMap(Map<String, String> map) {
+    Map<String,String> newMap = new HashMap<>();
+    for( String key : map.keySet() ) {
+      String value = map.get(key);
+      newMap.put(encode(key), encode(value));
+    }
+    return newMap;
+  }
+
+
+  public static String generateAuthorizationHeaderString(Map<String,String> 
oauthParamMap) {
+    SortedSet<String> sortedKeys = new TreeSet<>(oauthParamMap.keySet());
+
+    StringJoiner stringJoiner = new StringJoiner(", ");
+    for( String key : sortedKeys ) {
+      
stringJoiner.add(encode(key)+"="+"\""+encode(oauthParamMap.get(key))+"\"");
+    }
+
+    String authorization_header_string = new StringBuilder()
+        .append("OAuth ")
+        .append(stringJoiner.toString())
+        .toString();
+    return authorization_header_string;
+  }
+
+  public static String generateSignatureBaseString(String method, String 
request_url, String signature_parameter_string) {
+    String signature_base_string = new StringBuilder()
+        .append(method)
+        .append("&")
+        .append(encode(request_url))
+        .append("&")
+        .append(encode(signature_parameter_string))
+        .toString();
+    return signature_base_string;
+  }
+
+  public static String generateSignatureParameterString(Map<String, String> 
allParamsMap) {
+
+    SortedSet<String> sortedKeys = new TreeSet<>(allParamsMap.keySet());
+
+    StringJoiner stringJoiner = new StringJoiner("&");
+    for( String key : sortedKeys ) {
+      stringJoiner.add(key+"="+allParamsMap.get(key));
+    }
+
+    return stringJoiner.toString();
+  }
+
+  public static String encode(String value)
+  {
+    String encoded = null;
+    try {
+      encoded = URLEncoder.encode(value, oauth_signature_encoding);
+    } catch (UnsupportedEncodingException ignore) {
+    }
+    StringBuilder buf = new StringBuilder(encoded.length());
+    char focus;
+    for (int i = 0; i < encoded.length(); i++) {
+      focus = encoded.charAt(i);
+      if (focus == '*') {
+        buf.append("%2A");
+      } else if (focus == '+') {
+        buf.append("%20");
+      } else if (focus == '%' && (i + 1) < encoded.length()
+          && encoded.charAt(i + 1) == '7' && encoded.charAt(i + 2) == 'E') {
+        buf.append('~');
+        i += 2;
+      } else {
+        buf.append(focus);
+      }
+    }
+    return buf.toString();
+  }
+
+  public static String computeSignature(String baseString, String keyString) 
throws GeneralSecurityException, UnsupportedEncodingException
+  {
+    SecretKey secretKey = null;
+
+    byte[] keyBytes = keyString.getBytes();
+    secretKey = new SecretKeySpec(keyBytes, "HmacSHA1");
+
+    Mac mac = Mac.getInstance("HmacSHA1");
+    mac.init(secretKey);
+
+    byte[] text = baseString.getBytes();
+
+    return new String(base64Encoder.encode(mac.doFinal(text))).trim();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java
new file mode 100644
index 0000000..3f7a853
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.BackOffException;
+import 
org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy;
+
+import org.apache.http.client.HttpRequestRetryHandler;
+import org.apache.http.protocol.HttpContext;
+import org.apache.juneau.rest.client.RetryOn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *  Handle expected and unexpected exceptions.
+ */
+public class TwitterRetryHandler implements RetryOn {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterRetryHandler.class);
+
+  private static AbstractBackOffStrategy backoff_strategy;
+
+// TODO: once request context is available, we can maintain multiple 
BackoffStrategy one per request path / params
+//  private static Map<String, AbstractBackOffStrategy> backoffs = new 
ConcurrentHashMap<>();
+
+// This is everything we used to check via twitter4j to decide whether to 
retry.
+//
+// @Deprecated
+//  public static int handleTwitterError(Twitter twitter, Exception exception) 
{
+//    return handleTwitterError( twitter, null, exception);
+//  }
+//
+//
+//  public static int handleTwitterError(Twitter twitter, Long id, Exception 
exception) {
+//
+//    if (exception instanceof TwitterException) {
+//      TwitterException twitterException = (TwitterException)exception;
+//
+//      if (twitterException.exceededRateLimitation()) {
+//
+//        long millisUntilReset = retry;
+//
+//        final RateLimitStatus rateLimitStatus = 
twitterException.getRateLimitStatus();
+//        if (rateLimitStatus != null) {
+//          millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000;
+//        }
+//
+//        LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", 
millisUntilReset / 1000);
+//
+//        try {
+//          Thread.sleep(millisUntilReset);
+//        } catch (InterruptedException e1) {
+//          Thread.currentThread().interrupt();
+//        }
+//
+//        return 1;
+//      } else if (twitterException.isCausedByNetworkIssue()) {
+//        LOGGER.info("Twitter Network Issues Detected. Backing off...");
+//        LOGGER.info("{} - {}", twitterException.getExceptionCode(), 
twitterException.getLocalizedMessage());
+//        try {
+//          Thread.sleep(retry);
+//        } catch (InterruptedException e1) {
+//          Thread.currentThread().interrupt();
+//        }
+//        return 1;
+//      } else if (twitterException.isErrorMessageAvailable()) {
+//        if (twitterException.getMessage().toLowerCase().contains("does not 
exist")) {
+//          if ( id != null ) {
+//            LOGGER.warn("User does not exist: {}", id);
+//          } else {
+//            LOGGER.warn("User does not exist");
+//          }
+//          return (int)retryMax;
+//        } else {
+//          return (int)retryMax / 3;
+//        }
+//      } else {
+//        if (twitterException.getExceptionCode().equals("ced778ef-0c669ac0")) 
{
+//          // This is a known weird issue, not exactly sure the cause, but 
you'll never be able to get the data.
+//          return (int)retryMax / 3;
+//        } else if 
(twitterException.getExceptionCode().equals("4be80492-0a7bf7c7")) {
+//          // This is a 401 reflecting credentials don't have access to the 
requested resource.
+//          if ( id != null ) {
+//            LOGGER.warn("Authentication Exception accessing id: {}", id);
+//          } else {
+//            LOGGER.warn("Authentication Exception");
+//          }
+//          return (int)retryMax;
+//        } else {
+//          LOGGER.warn("Unknown Twitter Exception...");
+//          LOGGER.warn("   Access: {}", twitterException.getAccessLevel());
+//          LOGGER.warn("     Code: {}", twitterException.getExceptionCode());
+//          LOGGER.warn("  Message: {}", 
twitterException.getLocalizedMessage());
+//          return (int)retryMax / 10;
+//        }
+//      }
+//    } else if (exception instanceof RuntimeException) {
+//      LOGGER.warn("TwitterGrabber: Unknown Runtime Error", 
exception.getMessage());
+//      return (int)retryMax / 3;
+//    } else {
+//      LOGGER.info("Completely Unknown Exception: {}", exception);
+//      return (int)retryMax / 3;
+//    }
+//  }
+//  TODO: juneau 6.3.x-incubating
+//  @Override
+//  public boolean onCode(int httpResponseCode) {
+//
+//    LOGGER.warn("TwitterRetryHandler: {}", httpResponseCode);
+//
+//    if( httpResponseCode > 400 ) {
+//      return true;
+//    } else {
+//      return false;
+//    }
+//
+//  }
+
+  @Override
+  public boolean onCode(int httpResponseCode) {
+//    if( backoff_strategy == null ) {
+//      backoff_strategy = new LinearTimeBackOffStrategy(retrySleepMs / 1000, 
retryMax);
+//    }
+//    if( httpResponseCode > 400 ) {
+//      try {
+//        backoff_strategy.backOff();
+//        return true;
+//      } catch (BackOffException boe) {
+//        backoff_strategy.reset();
+//        return false;
+//      }
+//    } else {
+//      return false;
+//    }
+    if( httpResponseCode > 400 ) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java
new file mode 100644
index 0000000..5de9046
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+
+import java.util.List;
+
+/**
+ * Returns a collection of the most recent Tweets posted by the user indicated 
by the screen_name or user_id parameters.
+ */
+public interface Users {
+
+  /**
+   * Returns fully-hydrated user objects for up to 100 users per request, as 
specified by comma-separated values passed to the user_id and/or screen_name 
parameters.
+   *
+   * @param parameters {@link 
org.apache.streams.twitter.api.UsersLookupRequest}
+   * @return List<Tweet>
+   * @see <a 
href="https://dev.twitter.com/rest/reference/get/users/lookup";>https://dev.twitter.com/rest/reference/get/users/lookup</a>
+   *
+   */
+  public List<User> lookup(UsersLookupRequest parameters);
+
+  /**
+   * Returns a variety of information about the user specified by the required 
user_id or screen_name parameter. The author’s most recent Tweet will be 
returned inline when possible.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.UsersShowRequest}
+   * @return List<Tweet>
+   * @see <a 
href="https://dev.twitter.com/rest/reference/get/users/show";>https://dev.twitter.com/rest/reference/get/users/show</a>
+   *
+   */
+  public User show(UsersShowRequest parameters);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
index 6392e0d..6db16be 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
@@ -126,7 +126,7 @@ public class TwitterActivityUtil {
   }
 
   /**
-   * Builds the activity {@link ActivityObject} actor from the tweet
+   * Builds the activity {@link ActivityObject} actor from the tweet.
    * @param tweet the object to use as the source
    * @return a valid Actor populated from the Tweet
    */
@@ -138,7 +138,7 @@ public class TwitterActivityUtil {
   }
 
   /**
-   * Builds the activity {@link ActivityObject} actor from the User
+   * Builds the activity {@link ActivityObject} actor from the User.
    * @param user the object to use as the source
    * @return a valid Actor populated from the Tweet
    */
@@ -186,7 +186,7 @@ public class TwitterActivityUtil {
   }
 
   /**
-   * Creates an {@link ActivityObject} for the tweet
+   * Creates an {@link ActivityObject} for the tweet.
    * @param tweet the object to use as the source
    * @return a valid ActivityObject
    */

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
index 89a5abd..a170626 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
@@ -28,23 +28,20 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.TwitterConfiguration;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.api.StatusesShowRequest;
+import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterProviderUtil;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import twitter4j.TwitterFactory;
-import twitter4j.TwitterObjectFactory;
-import twitter4j.conf.ConfigurationBuilder;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -89,7 +86,13 @@ public class FetchAndReplaceTwitterProcessor implements 
StreamsProcessor {
       Activity doc = (Activity)entry.getDocument();
       String originalId = doc.getId();
       if (PROVIDER_ID.equals(doc.getProvider().getId())) {
-        fetchAndReplace(doc, originalId);
+        try {
+          fetchAndReplace(doc, originalId);
+        } catch (ActivityConversionException ex) {
+          LOGGER.warn("ActivityConversionException", ex);
+        } catch (IOException ex) {
+          LOGGER.warn("IOException", ex);
+        }
       }
     } else {
       throw new IllegalStateException("Requires an activity document");
@@ -100,8 +103,14 @@ public class FetchAndReplaceTwitterProcessor implements 
StreamsProcessor {
 
   @Override
   public void prepare(Object configurationObject) {
-    this.client = getTwitterClient();
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+    Objects.requireNonNull(client);
     this.mapper = StreamsJacksonMapper.getInstance();
+    Objects.requireNonNull(mapper);
   }
 
   @Override
@@ -109,22 +118,14 @@ public class FetchAndReplaceTwitterProcessor implements 
StreamsProcessor {
 
   }
 
-  protected void fetchAndReplace(Activity doc, String originalId) {
-    try {
-      String json = fetch(doc);
-      replace(doc, json);
-      doc.setId(originalId);
-      retryCount = 0;
-    } catch (TwitterException tw) {
-      if (tw.exceededRateLimitation()) {
-        sleepAndTryAgain(doc, originalId);
-      }
-    } catch (Exception ex) {
-      LOGGER.warn("Error fetching and replacing tweet for activity {}", 
doc.getId());
-    }
+  protected void fetchAndReplace(Activity doc, String originalId) throws 
java.io.IOException, ActivityConversionException {
+    Tweet tweet = fetch(doc);
+    replace(doc, tweet);
+    doc.setId(originalId);
   }
 
-  protected void replace(Activity doc, String json) throws 
java.io.IOException, ActivityConversionException {
+  protected void replace(Activity doc, Tweet tweet) throws 
java.io.IOException, ActivityConversionException {
+    String json = mapper.writeValueAsString(tweet);
     Class documentSubType = new 
TwitterDocumentClassifier().detectClasses(json).get(0);
     Object object = mapper.readValue(json, documentSubType);
 
@@ -137,52 +138,38 @@ public class FetchAndReplaceTwitterProcessor implements 
StreamsProcessor {
     }
   }
 
-  protected String fetch(Activity doc) throws TwitterException {
+  protected Tweet fetch(Activity doc) {
     String id = doc.getObject().getId();
     LOGGER.debug("Fetching status from Twitter for {}", id);
     Long tweetId = Long.valueOf(id.replace("id:twitter:tweets:", ""));
-    Status status = getTwitterClient().showStatus(tweetId);
-    return TwitterObjectFactory.getRawJSON(status);
+    Tweet tweet = client.show(
+        new StatusesShowRequest()
+            .withId(tweetId)
+    );
+    return tweet;
   }
 
 
-  protected Twitter getTwitterClient() {
-
-    if (this.client == null) {
+  protected Twitter getTwitterClient() throws InstantiationException {
 
-      String baseUrl = TwitterProviderUtil.baseUrl(config);
+    return Twitter.getInstance(config);
 
-      ConfigurationBuilder builder = new ConfigurationBuilder()
-          .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-          .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-          .setOAuthAccessToken(config.getOauth().getAccessToken())
-          .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-          .setIncludeEntitiesEnabled(true)
-          .setJSONStoreEnabled(true)
-          .setAsyncNumThreads(1)
-          .setRestBaseURL(baseUrl)
-          .setIncludeMyRetweetEnabled(Boolean.TRUE)
-          .setPrettyDebugEnabled(Boolean.TRUE);
-
-      this.client = new TwitterFactory(builder.build()).getInstance();
-    }
-    return this.client;
   }
 
   //Hardcore sleep to allow for catch up
-  protected void sleepAndTryAgain(Activity doc, String originalId) {
-    try {
-      //Attempt to fetchAndReplace with a backoff up to the limit then just 
reset the count and let the process continue
-      if (retryCount < MAX_ATTEMPTS) {
-        retryCount++;
-        LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter 
API", (retryCount * 4));
-        Thread.sleep(BACKOFF * retryCount);
-        fetchAndReplace(doc, originalId);
-      } else {
-        retryCount = 0;
-      }
-    } catch (InterruptedException ex) {
-      LOGGER.warn("Thread sleep interrupted while waiting for twitter 
backoff");
-    }
-  }
+//  protected void sleepAndTryAgain(Activity doc, String originalId) {
+//    try {
+//      //Attempt to fetchAndReplace with a backoff up to the limit then just 
reset the count and let the process continue
+//      if (retryCount < MAX_ATTEMPTS) {
+//        retryCount++;
+//        LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter 
API", (retryCount * 4));
+//        Thread.sleep(BACKOFF * retryCount);
+//        fetchAndReplace(doc, originalId);
+//      } else {
+//        retryCount = 0;
+//      }
+//    } catch (InterruptedException ex) {
+//      LOGGER.warn("Thread sleep interrupted while waiting for twitter 
backoff");
+//    }
+//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
deleted file mode 100644
index ec43fba..0000000
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.twitter.TwitterConfiguration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import twitter4j.RateLimitStatus;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-
-/**
- *  Handle expected and unexpected exceptions.
- */
-public class TwitterErrorHandler {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterErrorHandler.class);
-
-  // selected because 3 * 5 + n >= 15 for positive n
-  protected static long retry =
-      new 
ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
-          StreamsConfigurator.getConfig().getConfig("twitter")
-      ).getRetrySleepMs();
-  protected static long retryMax =
-      new 
ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
-          StreamsConfigurator.getConfig().getConfig("twitter")
-      ).getRetryMax();
-
-  @Deprecated
-  public static int handleTwitterError(Twitter twitter, Exception exception) {
-    return handleTwitterError( twitter, null, exception);
-  }
-
-  /**
-   * handleTwitterError.
-   * @param twitter Twitter
-   * @param id id
-   * @param exception exception
-   * @return
-   */
-  public static int handleTwitterError(Twitter twitter, Long id, Exception 
exception) {
-
-    if (exception instanceof TwitterException) {
-      TwitterException twitterException = (TwitterException)exception;
-
-      if (twitterException.exceededRateLimitation()) {
-
-        long millisUntilReset = retry;
-
-        final RateLimitStatus rateLimitStatus = 
twitterException.getRateLimitStatus();
-        if (rateLimitStatus != null) {
-          millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000;
-        }
-
-        LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", 
millisUntilReset / 1000);
-
-        try {
-          Thread.sleep(millisUntilReset);
-        } catch (InterruptedException e1) {
-          Thread.currentThread().interrupt();
-        }
-
-        return 1;
-      } else if (twitterException.isCausedByNetworkIssue()) {
-        LOGGER.info("Twitter Network Issues Detected. Backing off...");
-        LOGGER.info("{} - {}", twitterException.getExceptionCode(), 
twitterException.getLocalizedMessage());
-        try {
-          Thread.sleep(retry);
-        } catch (InterruptedException e1) {
-          Thread.currentThread().interrupt();
-        }
-        return 1;
-      } else if (twitterException.isErrorMessageAvailable()) {
-        if (twitterException.getMessage().toLowerCase().contains("does not 
exist")) {
-          if ( id != null ) {
-            LOGGER.warn("User does not exist: {}", id);
-          } else {
-            LOGGER.warn("User does not exist");
-          }
-          return (int)retryMax;
-        } else {
-          return (int)retryMax / 3;
-        }
-      } else {
-        if (twitterException.getExceptionCode().equals("ced778ef-0c669ac0")) {
-          // This is a known weird issue, not exactly sure the cause, but 
you'll never be able to get the data.
-          return (int)retryMax / 3;
-        } else if 
(twitterException.getExceptionCode().equals("4be80492-0a7bf7c7")) {
-          // This is a 401 reflecting credentials don't have access to the 
requested resource.
-          if ( id != null ) {
-            LOGGER.warn("Authentication Exception accessing id: {}", id);
-          } else {
-            LOGGER.warn("Authentication Exception");
-          }
-          return (int)retryMax;
-        } else {
-          LOGGER.warn("Unknown Twitter Exception...");
-          LOGGER.warn("  Account: {}", twitter);
-          LOGGER.warn("   Access: {}", twitterException.getAccessLevel());
-          LOGGER.warn("     Code: {}", twitterException.getExceptionCode());
-          LOGGER.warn("  Message: {}", twitterException.getLocalizedMessage());
-          return (int)retryMax / 10;
-        }
-      }
-    } else if (exception instanceof RuntimeException) {
-      LOGGER.warn("TwitterGrabber: Unknown Runtime Error", 
exception.getMessage());
-      return (int)retryMax / 3;
-    } else {
-      LOGGER.info("Completely Unknown Exception: {}", exception);
-      return (int)retryMax / 3;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
new file mode 100644
index 0000000..b9448e6
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FollowersIdsRequest;
+import org.apache.streams.twitter.api.FollowersIdsResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFollowersIdsProviderTask implements Runnable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowersIdsProviderTask.class);
+
+  private static final ObjectMapper mapper = 
StreamsJacksonMapper.getInstance();
+
+  protected Twitter client;
+  protected TwitterFollowingProvider provider;
+  protected FollowersIdsRequest request;
+
+  private int count = 0;
+
+  /**
+   * TwitterFollowingProviderTask constructor.
+   * @param provider TwitterFollowingProvider
+   * @param twitter Twitter
+   * @param request FollowersIdsRequest
+   */
+  public TwitterFollowersIdsProviderTask(TwitterFollowingProvider provider, 
Twitter twitter, FollowersIdsRequest request) {
+    this.provider = provider;
+    this.client = twitter;
+    this.request = request;
+  }
+
+  @Override
+  public void run() {
+
+    Preconditions.checkArgument(request.getId() != null || 
request.getScreenName() != null);
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
+    getFollowersIds(request);
+
+    LOGGER.info("Thread Finished: {}", request.toString());
+
+  }
+
+  int last_count = 0;
+  int page_count = 1;
+  int item_count = 0;
+  long cursor = 0;
+
+  private void getFollowersIds(FollowersIdsRequest request) {
+
+    do {
+
+      FollowersIdsResponse response = client.ids(request);
+
+      last_count = response.getIds().size();
+
+      if (response.getIds().size() > 0) {
+
+        for (Long id : response.getIds()) {
+
+          Follow follow = new Follow()
+              .withFollowee(
+                  new User()
+                      .withId(request.getId())
+                      .withScreenName(request.getScreenName()))
+              .withFollower(
+                  new User()
+                      .withId(id));
+
+          if (item_count < provider.getConfig().getMaxItems()) {
+            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), 
provider.providerQueue);
+            item_count++;
+          }
+
+        }
+
+      }
+      page_count++;
+      cursor = response.getNextCursor();
+      request.setCursor(cursor);
+    }
+    while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+  }
+
+  public boolean shouldContinuePulling(long cursor, int count, int page_count, 
int item_count) {
+    return (
+        cursor > 0
+            && count > 0
+            && item_count < provider.getConfig().getMaxItems()
+            && page_count <= provider.getConfig().getMaxPages());
+  }
+
+}

Reply via email to