STREAMS-496: Remove twitter4j dependency from streams-provider-twitter resolves #360 commit 05aa4ddc907c845fdbbfc9e8feb85cc0f9ecc2fb Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Mon Apr 17 17:04:24 2017 -0500
upgrade hosebird client because why not commit 3dc5f515fc067fbd50af431139b98c3bc408cff8 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Wed Apr 12 23:48:21 2017 -0500 PR feedback, juneau target version bump commit b67487a54f7e2b6676ab031f8ff0091393111f56 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Wed Apr 12 22:48:58 2017 -0500 PR feedback - javadoc accuracy commit 996c1b47e9dc485692dbc5a4c18f5754c697cd91 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Wed Apr 12 22:33:27 2017 -0500 clean up commit 8618abcad4e6087c0270ba8c54a201888071e0e8 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Tue Apr 11 16:45:11 2017 -0500 retry logic commit d7cd07110d602d24a07aad8e8f390ff0d98f1e19 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Mon Apr 3 15:11:32 2017 -0500 checkstyle clean up commit 069be4da745b9cd262159cca1982a9b52cb5cefc Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Mon Apr 3 14:56:01 2017 -0500 all integration tests passing using juneau 6.1.0-incubating commit 517b85f02e93c51d5fa36a704f7f0e0e2cdaed4e Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Mon Apr 3 11:01:25 2017 -0500 oauth signing working commit 06845f2dd848c91971c10f194f0d5503d5f4a964 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Tue Mar 28 13:20:03 2017 -0500 return empty array not null on error commit eddf003ba5b5555b10c3d51267c00225b6da4ef1 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Sun Mar 26 22:46:19 2017 -0500 TwitterOAuthRequestInterceptorTest.testProcess TwitterOAuthRequestInterceptorTest.testProcess is pretty much directly off the twitter dev guidelines and tests the entire auth process at once. passes. ITs still fail commit c623f7139421049d5b536576c739b60e959cf11e Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Sun Mar 26 17:55:09 2017 -0500 STREAMS-496: going through guide word-by-word and debugging line-by-line looking for mistakes commit 5a07c57b553f8c87af1d42e0bbef4f0795f17bf7 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Sun Mar 26 17:44:37 2017 -0500 STREAMS-496: more unit tests, all passing, ITs still fail commit edc2662fd68b9e1e04c20b8721a44a6ee9fb36e6 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Sun Mar 26 16:20:10 2017 -0500 adding unit testing to debug authentication commit 1885090c6d683c8a9a6df7c64f933529378ee965 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Sun Mar 26 13:09:24 2017 -0500 cleanup authentication. debugging âBad Authentication data.â commit 434657439abc0dde2998a8cb213d8e8398f43ad6 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Sun Mar 26 12:56:34 2017 -0500 first implementation of oauth signing commit 0d172e76232e3d0bd48af8d6616716c444eb94f0 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Sun Mar 26 12:56:11 2017 -0500 STREAMS-496: consolidate httpcomponents version selection commit 78f30284caefb09e06818a9f435c3f380ee986a3 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Sat Mar 25 11:38:06 2017 -0500 STREAMS-496: WIP - twitter4j gone, src/main/java compiles commit c95bccd41b4f5f51d7e614e5927210458d1dbfbd Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Fri Mar 24 14:21:34 2017 -0500 implementations using RestClient commit 68d7976f928173dbd6de636c58d1a23dbe95eb15 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Fri Mar 24 13:39:52 2017 -0500 STREAMS-496: additional pojos and interfaces needed to maintain features commit a8fd796096296bb2e8af61b722c4a261acbba987 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Mon Mar 20 13:07:27 2017 -0500 more WIP commit 7dedbb42251e1c5214690e32f12a5efb5032dc92 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Date: Mon Mar 20 01:35:33 2017 -0500 STREAMS-496: WIP Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/67497a48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/67497a48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/67497a48 Branch: refs/heads/master Commit: 67497a488a9417dcdbf702f707729928500c8d5c Parents: 19caf33 Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Authored: Wed Apr 19 14:49:38 2017 -0500 Committer: Steve Blackmon @steveblackmon <sblack...@apache.org> Committed: Wed Apr 19 14:49:38 2017 -0500 ---------------------------------------------------------------------- pom.xml | 14 +- streams-components/streams-http/pom.xml | 1 - streams-contrib/streams-persist-riak/pom.xml | 3 - .../streams-provider-twitter/pom.xml | 25 +- .../apache/streams/twitter/api/Followers.java | 52 ++ .../org/apache/streams/twitter/api/Friends.java | 52 ++ .../apache/streams/twitter/api/Statuses.java | 62 +++ .../org/apache/streams/twitter/api/Twitter.java | 521 +++++++++++++++++++ .../api/TwitterOAuthRequestInterceptor.java | 236 +++++++++ .../twitter/api/TwitterRetryHandler.java | 162 ++++++ .../org/apache/streams/twitter/api/Users.java | 51 ++ .../converter/util/TwitterActivityUtil.java | 6 +- .../FetchAndReplaceTwitterProcessor.java | 107 ++-- .../twitter/provider/TwitterErrorHandler.java | 133 ----- .../TwitterFollowersIdsProviderTask.java | 124 +++++ .../TwitterFollowersListProviderTask.java | 120 +++++ .../provider/TwitterFollowingProvider.java | 188 +++++-- .../provider/TwitterFollowingProviderTask.java | 266 ---------- .../provider/TwitterFriendsIdsProviderTask.java | 125 +++++ .../TwitterFriendsListProviderTask.java | 119 +++++ .../provider/TwitterTimelineProvider.java | 157 +++--- .../provider/TwitterTimelineProviderTask.java | 81 ++- .../TwitterUserInformationProvider.java | 256 ++++----- .../TwitterUserInformationProviderTask.java | 77 +++ .../streams/twitter/TwitterConfiguration.json | 87 ---- .../twitter/TwitterFollowingConfiguration.json | 33 -- .../twitter/TwitterStreamConfiguration.json | 45 -- .../TwitterTimelineProviderConfiguration.json | 23 - .../TwitterUserInformationConfiguration.json | 25 - .../twitter/api/FollowersIdsRequest.json | 13 + .../twitter/api/FollowersIdsResponse.json | 32 ++ .../twitter/api/FollowersListRequest.json | 13 + .../twitter/api/FollowersListResponse.json | 33 ++ .../twitter/api/FollowingIdsRequest.json | 36 ++ .../twitter/api/FollowingListRequest.json | 41 ++ .../streams/twitter/api/FriendsIdsRequest.json | 13 + .../streams/twitter/api/FriendsIdsResponse.json | 32 ++ .../streams/twitter/api/FriendsListRequest.json | 13 + .../twitter/api/FriendsListResponse.json | 33 ++ .../twitter/api/StatusesLookupRequest.json | 35 ++ .../twitter/api/StatusesShowRequest.json | 32 ++ .../api/StatusesUserTimelineRequest.json | 57 ++ .../streams/twitter/api/UsersLookupRequest.json | 33 ++ .../streams/twitter/api/UsersShowRequest.json | 27 + .../twitter/config/TwitterConfiguration.json | 92 ++++ .../config/TwitterFollowingConfiguration.json | 33 ++ .../config/TwitterStreamConfiguration.json | 45 ++ .../TwitterTimelineProviderConfiguration.json | 23 + .../TwitterUserInformationConfiguration.json | 25 + .../api/TwitterOAuthRequestInterceptorTest.java | 124 +++++ .../providers/TwitterTimelineProviderIT.java | 2 +- streams-monitoring/pom.xml | 5 +- 52 files changed, 2911 insertions(+), 1032 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e83dab2..d779fa9 100644 --- a/pom.xml +++ b/pom.xml @@ -359,8 +359,8 @@ <facebook4j.version>2.4.7</facebook4j.version> <mockito.version>1.10.19</mockito.version> <powermock.version>1.6.5</powermock.version> - <httpcomponents.core.version>4.4.5</httpcomponents.core.version> - <httpcomponents.client.version>4.5.2</httpcomponents.client.version> + <httpcomponents.core.version>4.4.6</httpcomponents.core.version> + <httpcomponents.client.version>4.5.3</httpcomponents.client.version> <doxia.version>1.7</doxia.version> <!-- osgi configuration --> @@ -978,6 +978,16 @@ <version>${commons-lang3.version}</version> </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>${httpcomponents.core.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${httpcomponents.client.version}</version> + </dependency> + <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>${typesafe.config.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-components/streams-http/pom.xml ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/pom.xml b/streams-components/streams-http/pom.xml index 4d97bb6..603c02e 100644 --- a/streams-components/streams-http/pom.xml +++ b/streams-components/streams-http/pom.xml @@ -62,7 +62,6 @@ <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> - <version>${httpcomponents.client.version}</version> <exclusions> <exclusion> <groupId>commons-logging</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-persist-riak/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-riak/pom.xml b/streams-contrib/streams-persist-riak/pom.xml index 3b24fc2..f96c2c3 100644 --- a/streams-contrib/streams-persist-riak/pom.xml +++ b/streams-contrib/streams-persist-riak/pom.xml @@ -30,8 +30,6 @@ <description>Riak Module</description> <properties> - <httpcomponents.core.version>4.3.5</httpcomponents.core.version> - <httpcomponents.client.version>4.3.5</httpcomponents.client.version> <riak.version>2.0.6</riak.version> </properties> @@ -58,7 +56,6 @@ <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> - <version>${httpcomponents.client.version}</version> <exclusions> <exclusion> <groupId>commons-logging</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index 91d2f88..6a0778d 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -90,22 +90,31 @@ <dependency> <groupId>com.twitter</groupId> <artifactId>hbc-core</artifactId> - <version>2.1.0</version> + <version>2.2.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> </dependency> <dependency> - <groupId>org.twitter4j</groupId> - <artifactId>twitter4j-core</artifactId> - <version>4.0.3</version> + <groupId>org.apache.juneau</groupId> + <artifactId>juneau-rest-client</artifactId> + <version>6.1.0-incubating</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <version>1.3</version> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.streams</groupId> @@ -114,6 +123,12 @@ <scope>test</scope> <type>test-jar</type> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java new file mode 100644 index 0000000..5e04e6e --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.api; + +import org.apache.streams.twitter.pojo.Tweet; + +import java.util.List; + +/** + * Returns a collection of the most recent Tweets posted by the user indicated by the screen_name or user_id parameters. + * + * @see <a href="https://dev.twitter.com/rest/reference/get/followers/ids">https://api.twitter.com/1.1/statuses/user_timeline.json</a> + */ +public interface Followers { + + /** + * Returns a cursored collection of user IDs for every user following the specified user. + * + * @param parameters {@link org.apache.streams.twitter.api.FollowersIdsRequest} + * @return List < Tweet > + * @see <a href="https://dev.twitter.com/rest/reference/get/followers/ids">https://dev.twitter.com/rest/reference/get/followers/ids</a> + * + */ + public FollowersIdsResponse ids(FollowersIdsRequest parameters); + + /** + * Returns a cursored collection of user objects for users following the specified user. + * + * @param parameters {@link org.apache.streams.twitter.api.FollowersListRequest} + * @return List < Tweet > + * @see <a href="https://dev.twitter.com/rest/reference/get/followers/list">https://dev.twitter.com/rest/reference/get/followers/list</a> + * + */ + public FollowersListResponse list(FollowersListRequest parameters); + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java new file mode 100644 index 0000000..d0d1f72 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.api; + +import org.apache.streams.twitter.pojo.Tweet; + +import java.util.List; + +/** + * Returns a collection of the most recent Tweets posted by the user indicated by the screen_name or user_id parameters. + * + * @see <a href="https://dev.twitter.com/rest/reference/get/statuses/user_timeline">https://api.twitter.com/1.1/statuses/user_timeline.json</a> + */ +public interface Friends { + + /** + * Returns a cursored collection of user IDs for every user the specified user is following. + * + * @param parameters {@link org.apache.streams.twitter.api.FriendsIdsRequest} + * @return List<Tweet> + * @see <a href="https://dev.twitter.com/rest/reference/get/friends/ids">https://dev.twitter.com/rest/reference/get/friends/ids</a> + * + */ + public FriendsIdsResponse ids(FriendsIdsRequest parameters); + + /** + * Returns a cursored collection of user objects for every user the specified user is following. + * + * @param parameters {@link org.apache.streams.twitter.api.FriendsListRequest} + * @return List<Tweet> + * @see <a href="https://dev.twitter.com/rest/reference/get/friends/list">https://dev.twitter.com/rest/reference/get/friends/list</a> + * + */ + public FriendsListResponse list(FriendsListRequest parameters); + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java new file mode 100644 index 0000000..c9945b0 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.api; + +import org.apache.streams.twitter.pojo.Tweet; + +import java.util.List; + +/** + * Returns a collection of the most recent Tweets posted by the user indicated by the screen_name or user_id parameters. + * + * @see <a href="https://dev.twitter.com/rest/reference/get/statuses/user_timeline">https://api.twitter.com/1.1/statuses/user_timeline.json</a> + */ +public interface Statuses { + + /** + * Returns fully-hydrated Tweet objects for up to 100 Tweets per request, as specified by comma-separated values passed to the id parameter. + * + * @param parameters {@link org.apache.streams.twitter.api.StatusesLookupRequest} + * @return List<Tweet> + * @see <a href="https://dev.twitter.com/rest/reference/get/statuses/lookup">https://dev.twitter.com/rest/reference/get/statuses/lookup</a> + * + */ + public List<Tweet> lookup(StatusesLookupRequest parameters); + + /** + * Returns a single Tweet, specified by the id parameter. The Tweetâs author will also be embedded within the Tweet. + * + * @param parameters {@link org.apache.streams.twitter.api.StatusesShowRequest} + * @return List<Tweet> + * @see <a href="https://dev.twitter.com/rest/reference/get/statuses/show/id">https://dev.twitter.com/rest/reference/get/statuses/show/id</a> + * + */ + public Tweet show(StatusesShowRequest parameters); + + /** + * Returns a collection of the most recent Tweets posted by the user indicated by the screen_name or user_id parameters. + * + * @param parameters {@link org.apache.streams.twitter.api.StatusesUserTimelineRequest} + * @return List<Tweet> + * @see <a href="https://dev.twitter.com/rest/reference/get/statuses/user_timeline">https://dev.twitter.com/rest/reference/get/statuses/user_timeline</a> + * + */ + public List<Tweet> userTimeline(StatusesUserTimelineRequest parameters); + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java new file mode 100644 index 0000000..f4570d6 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.api; + +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.TwitterConfiguration; +import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.pojo.User; +import org.apache.streams.twitter.provider.TwitterProviderUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpRequestInterceptor; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.params.ClientPNames; +import org.apache.http.client.params.CookiePolicy; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.juneau.json.JsonParser; +import org.apache.juneau.parser.ParseException; +import org.apache.juneau.plaintext.PlainTextSerializer; +import org.apache.juneau.rest.client.RestCall; +import org.apache.juneau.rest.client.RestCallException; +import org.apache.juneau.rest.client.RestClient; +//import org.apache.juneau.rest.client.RestClientBuilder; +import org.apache.juneau.rest.client.RetryOn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implementation of all twitter interfaces using juneau. + */ +public class Twitter implements Followers, Friends, Statuses, Users { + + private static final Logger LOGGER = LoggerFactory.getLogger(Twitter.class); + + private static Map<TwitterConfiguration, Twitter> INSTANCE_MAP = new ConcurrentHashMap<>(); + + private TwitterConfiguration configuration; + + private ObjectMapper mapper; + + private String rootUrl; + + private CloseableHttpClient httpclient; + + private HttpRequestInterceptor oauthInterceptor; + + RestClient restClient; + + private Twitter(TwitterConfiguration configuration) throws InstantiationException { + this.configuration = configuration; + this.rootUrl = TwitterProviderUtil.baseUrl(configuration); + this.oauthInterceptor = new TwitterOAuthRequestInterceptor(configuration.getOauth()); + this.httpclient = HttpClientBuilder.create() + .addInterceptorFirst(oauthInterceptor) + .setDefaultRequestConfig(RequestConfig.custom() + .setConnectionRequestTimeout(5000) + .setConnectTimeout(5000) + .setSocketTimeout(5000) + .setCookieSpec("easy") + .build() + ) + .setMaxConnPerRoute(20) + .setMaxConnTotal(100) + .build(); + +// TODO: juneau-6.3.x-incubating +// this.restClient = new RestClientBuilder() +// .httpClient(httpclient, true) +// .parser(JsonParser.class) +// .rootUrl(rootUrl) +// .retryable( +// configuration.getRetryMax().intValue(), +// configuration.getRetrySleepMs(), +// new TwitterRetryHandler()) +// .build(); + this.restClient = new RestClient() + .setHttpClient(httpclient) + .setParser(JsonParser.class) + .setRootUrl(rootUrl); + + this.mapper = StreamsJacksonMapper.getInstance(); + } + + public static Twitter getInstance(TwitterConfiguration configuration) throws InstantiationException { + if (INSTANCE_MAP.containsKey(configuration) && INSTANCE_MAP.get(configuration) != null) { + return INSTANCE_MAP.get(configuration); + } else { + Twitter twitter = new Twitter(configuration); + INSTANCE_MAP.put(configuration, twitter); + return INSTANCE_MAP.get(configuration); + } + } + + @Override + public List<Tweet> userTimeline(StatusesUserTimelineRequest parameters) { + try { +// TODO: juneau-6.3.x-incubating +// Statuses restStatuses = restClient.getRemoteableProxy("/statuses/user_timeline.json", Statuses.class); +// List<Tweet> result = restStatuses.userTimeline(parameters); +// return result; + URIBuilder uriBuilder = new URIBuilder() + .setPath("/statuses/user_timeline.json"); + if( StringUtils.isNotBlank(parameters.getUserId().toString())) { + uriBuilder.addParameter("user_id", parameters.getUserId().toString()); + } + if( StringUtils.isNotBlank(parameters.getScreenName())) { + uriBuilder.addParameter("screen_name", parameters.getScreenName()); + } + if( Objects.nonNull(parameters.getSinceId()) && StringUtils.isNotBlank(parameters.getSinceId().toString())) { + uriBuilder.addParameter("since_id", parameters.getSinceId().toString()); + } + if( Objects.nonNull(parameters.getCount()) && StringUtils.isNotBlank(parameters.getCount().toString())) { + uriBuilder.addParameter("count", parameters.getCount().toString()); + } + if( Objects.nonNull(parameters.getMaxId()) && StringUtils.isNotBlank(parameters.getMaxId().toString())) { + uriBuilder.addParameter("max_id", parameters.getMaxId().toString()); + } + if( Objects.nonNull(parameters.getTrimUser()) && StringUtils.isNotBlank(parameters.getTrimUser().toString())) { + uriBuilder.addParameter("trim_user", parameters.getTrimUser().toString()); + } + if( Objects.nonNull(parameters.getExcludeReplies()) && StringUtils.isNotBlank(parameters.getExcludeReplies().toString())) { + uriBuilder.addParameter("exclude_replies", parameters.getExcludeReplies().toString()); + } + if( Objects.nonNull(parameters.getContributorDetails()) && StringUtils.isNotBlank(parameters.getContributorDetails().toString())) { + uriBuilder.addParameter("contributor_details", parameters.getContributorDetails().toString()); + } + if( Objects.nonNull(parameters.getIncludeRts()) && StringUtils.isNotBlank(parameters.getIncludeRts().toString())) { + uriBuilder.addParameter("include_rts", parameters.getIncludeRts().toString()); + } + RestCall restCall = restClient.doGet(uriBuilder.build().toString()); + try { + String restResponseEntity = restCall + .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler()) + .getResponseAsString(); + ArrayNode resultArrayNode = mapper.readValue(restResponseEntity, ArrayNode.class); + List<Tweet> result = new ArrayList(); + resultArrayNode.iterator().forEachRemaining(item -> result.add(mapper.convertValue(item, Tweet.class))); + return result; + } catch (RestCallException e) { + LOGGER.warn("RestCallException", e); + } + } catch (IOException e) { + LOGGER.warn("IOException", e); + } catch (URISyntaxException e) { + LOGGER.warn("URISyntaxException", e); + } + return new ArrayList<>(); + } + + @Override + public List<Tweet> lookup(StatusesLookupRequest parameters) { +// TODO: juneau-6.3.x-incubating +// Statuses restStatuses = restClient.getRemoteableProxy("/statuses/lookup.json", Statuses.class); +// List<Tweet> result = restStatuses.lookup(parameters); +// return result; + String ids = StringUtils.join(parameters.getId(), ','); + try { + URIBuilder uriBuilder = new URIBuilder() + .setPath("/statuses/lookup.json"); + if( Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) { + uriBuilder.addParameter("id", parameters.getId().toString()); + } + if( Objects.nonNull(parameters.getTrimUser()) && StringUtils.isNotBlank(parameters.getTrimUser().toString())) { + uriBuilder.addParameter("trim_user", parameters.getTrimUser().toString()); + } + if( Objects.nonNull(parameters.getIncludeEntities()) && StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) { + uriBuilder.addParameter("include_entities", parameters.getIncludeEntities().toString()); + } + if( Objects.nonNull(parameters.getMap()) && StringUtils.isNotBlank(parameters.getMap().toString())) { + uriBuilder.addParameter("map", parameters.getMap().toString()); + } + RestCall restCall = restClient.doGet(uriBuilder.build().toString()); + try { + String restResponseEntity = restCall + .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler()) + .getResponseAsString(); + ArrayNode resultArrayNode = mapper.readValue(restResponseEntity, ArrayNode.class); + List<Tweet> result = new ArrayList(); + resultArrayNode.iterator().forEachRemaining(item -> result.add(mapper.convertValue(item, Tweet.class))); + //List<Tweet> result = restCall.getResponse(LinkedList.class, Tweet.class); + return result; + } catch (RestCallException e) { + LOGGER.warn("RestCallException", e); + } + } catch (IOException e) { + LOGGER.warn("IOException", e); + } catch (URISyntaxException e) { + LOGGER.warn("URISyntaxException", e); + } + return new ArrayList<>(); + } + + @Override + public Tweet show(StatusesShowRequest parameters) { +// TODO: juneau-6.3.x-incubating +// Statuses restStatuses = restClient.getRemoteableProxy("/statuses/show.json", Statuses.class); +// Tweet result = restStatuses.show(parameters); +// return result; + try { + URIBuilder uriBuilder = new URIBuilder() + .setPath("/statuses/show.json"); + if (Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) { + uriBuilder.addParameter("id", parameters.getId().toString()); + } + if (Objects.nonNull(parameters.getTrimUser()) && StringUtils.isNotBlank(parameters.getTrimUser().toString())) { + uriBuilder.addParameter("trim_user", parameters.getTrimUser().toString()); + } + if (Objects.nonNull(parameters.getIncludeEntities()) && StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) { + uriBuilder.addParameter("include_entities", parameters.getIncludeEntities().toString()); + } + if (Objects.nonNull(parameters.getIncludeMyRetweet()) && StringUtils.isNotBlank(parameters.getIncludeMyRetweet().toString())) { + uriBuilder.addParameter("include_my_retweet", parameters.getIncludeMyRetweet().toString()); + } + RestCall restCall = restClient.doGet(uriBuilder.build().toString()); + try { + String restResponseEntity = restCall + .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler()) + .getResponseAsString(); + //Tweet result = restCall.getResponse(Tweet.class); + Tweet result = mapper.readValue(restResponseEntity, Tweet.class); + return result; + } catch (RestCallException e) { + LOGGER.warn("RestCallException", e); + } + } catch (IOException e) { + LOGGER.warn("IOException", e); + } catch (URISyntaxException e) { + LOGGER.warn("URISyntaxException", e); + } + return null; + } + + @Override + public FriendsIdsResponse ids(FriendsIdsRequest parameters) { +// TODO: juneau-6.3.x-incubating +// Friends restFriends = restClient.getRemoteableProxy("/friends/ids.json", Friends.class); +// FriendsIdsResponse result = restFriends.ids(parameters); +// return result; + try { + URIBuilder uriBuilder = new URIBuilder() + .setPath("/friends/ids.json"); + if( Objects.nonNull(parameters.getCount()) && StringUtils.isNotBlank(parameters.getCount().toString())) { + uriBuilder.addParameter("count", parameters.getCount().toString()); + } + if( Objects.nonNull(parameters.getCursor()) && StringUtils.isNotBlank(parameters.getCursor().toString())) { + uriBuilder.addParameter("cursor", parameters.getCursor().toString()); + } + if( Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) { + uriBuilder.addParameter("id", parameters.getId().toString()); + } + if( StringUtils.isNotBlank(parameters.getScreenName())) { + uriBuilder.addParameter("screen_name", parameters.getScreenName()); + } + if( Objects.nonNull(parameters.getStringifyIds()) && StringUtils.isNotBlank(parameters.getStringifyIds().toString())) { + uriBuilder.addParameter("stringify_ids", parameters.getStringifyIds().toString()); + } + RestCall restCall = restClient.doGet(uriBuilder.build().toString()); + try { + String restResponseEntity = restCall + .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler()) + .getResponseAsString(); + //FriendsIdsResponse result = restCall.getResponse(FriendsIdsResponse.class); + FriendsIdsResponse result = mapper.readValue(restResponseEntity, FriendsIdsResponse.class); + return result; + } catch (RestCallException e) { + LOGGER.warn("RestCallException", e); + } + } catch (IOException e) { + LOGGER.warn("IOException", e); + } catch (URISyntaxException e) { + LOGGER.warn("URISyntaxException", e); + } + return null; + } + + @Override + public FriendsListResponse list(FriendsListRequest parameters) { +// TODO: juneau-6.3.x-incubating +// Friends restFriends = restClient.getRemoteableProxy("/friends/list.json", Friends.class); +// FriendsListResponse result = restFriends.list(parameters); +// return result; + try { + URIBuilder uriBuilder = new URIBuilder() + .setPath("/friends/list.json"); + if (Objects.nonNull(parameters.getCount()) && StringUtils.isNotBlank(parameters.getCount().toString())) { + uriBuilder.addParameter("count", parameters.getCount().toString()); + } + if (Objects.nonNull(parameters.getCursor()) && StringUtils.isNotBlank(parameters.getCursor().toString())) { + uriBuilder.addParameter("cursor", parameters.getCursor().toString()); + } + if (Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) { + uriBuilder.addParameter("id", parameters.getId().toString()); + } + if (Objects.nonNull(parameters.getIncludeUserEntities()) && StringUtils.isNotBlank(parameters.getIncludeUserEntities().toString())) { + uriBuilder.addParameter("include_user_entities", parameters.getIncludeUserEntities().toString()); + } + if (StringUtils.isNotBlank(parameters.getScreenName())) { + uriBuilder.addParameter("screen_name", parameters.getScreenName()); + } + if (Objects.nonNull(parameters.getSkipStatus()) && StringUtils.isNotBlank(parameters.getSkipStatus().toString())) { + uriBuilder.addParameter("skip_status", parameters.getSkipStatus().toString()); + } + RestCall restCall = restClient.doGet(uriBuilder.build().toString()); + try { + String restResponseEntity = restCall + .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler()) + .getResponseAsString(); + //FriendsListResponse result = restCall.getResponse(FriendsListResponse.class); + FriendsListResponse result = mapper.readValue(restResponseEntity, FriendsListResponse.class); + return result; + } catch (RestCallException e) { + LOGGER.warn("RestCallException", e); + } + }catch (IOException e) { + LOGGER.warn("IOException", e); + } catch (URISyntaxException e) { + LOGGER.warn("URISyntaxException", e); + } + return null; + } + + @Override + public FollowersIdsResponse ids(FollowersIdsRequest parameters) { +// TODO: juneau-6.3.x-incubating +// Followers restFollowers = restClient.getRemoteableProxy("/friends/list.json", Followers.class); +// FollowersIdsResponse result = restFollowers.ids(parameters); +// return result; + try { + URIBuilder uriBuilder = new URIBuilder() + .setPath("/followers/ids.json"); + if (Objects.nonNull(parameters.getCount()) && StringUtils.isNotBlank(parameters.getCount().toString())) { + uriBuilder.addParameter("count", parameters.getCount().toString()); + } + if (Objects.nonNull(parameters.getCursor()) && StringUtils.isNotBlank(parameters.getCursor().toString())) { + uriBuilder.addParameter("cursor", parameters.getCursor().toString()); + } + if (Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) { + uriBuilder.addParameter("id", parameters.getId().toString()); + } + if (StringUtils.isNotBlank(parameters.getScreenName())) { + uriBuilder.addParameter("screen_name", parameters.getScreenName()); + } + if (Objects.nonNull(parameters.getStringifyIds()) && StringUtils.isNotBlank(parameters.getStringifyIds().toString())) { + uriBuilder.addParameter("stringify_ids", parameters.getStringifyIds().toString()); + } + RestCall restCall = restClient.doGet(uriBuilder.build().toString()); + try { + //FollowersIdsResponse result = restCall.getResponse(FollowersIdsResponse.class); + String restResponseEntity = restCall + .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler()) + .getResponseAsString(); + FollowersIdsResponse result = mapper.readValue(restResponseEntity, FollowersIdsResponse.class); + return result; + } catch (RestCallException e) { + LOGGER.warn("RestCallException", e); + } + } catch (IOException e) { + LOGGER.warn("IOException", e); + } catch (URISyntaxException e) { + LOGGER.warn("URISyntaxException", e); + } + return null; + } + + @Override + public FollowersListResponse list(FollowersListRequest parameters) { +// TODO: juneau-6.3.x-incubating +// Followers restFollowers = restClient.getRemoteableProxy("/friends/list.json", Followers.class); +// FollowersListResponse result = restFollowers.list(parameters); +// return result; + try { + URIBuilder uriBuilder = + new URIBuilder() + .setPath("/followers/list.json"); + if (Objects.nonNull(parameters.getCount()) && StringUtils.isNotBlank(parameters.getCount().toString())) { + uriBuilder.addParameter("count", parameters.getCount().toString()); + } + if (Objects.nonNull(parameters.getCursor()) && StringUtils.isNotBlank(parameters.getCursor().toString())) { + uriBuilder.addParameter("cursor", parameters.getCursor().toString()); + } + if (Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) { + uriBuilder.addParameter("id", parameters.getId().toString()); + } + if (Objects.nonNull(parameters.getIncludeUserEntities()) && StringUtils.isNotBlank(parameters.getIncludeUserEntities().toString())) { + uriBuilder.addParameter("include_user_entities", parameters.getIncludeUserEntities().toString()); + } + if (StringUtils.isNotBlank(parameters.getScreenName())) { + uriBuilder.addParameter("screen_name", parameters.getScreenName()); + } + if (Objects.nonNull(parameters.getSkipStatus()) && StringUtils.isNotBlank(parameters.getSkipStatus().toString())) { + uriBuilder.addParameter("skip_status", parameters.getSkipStatus().toString()); + } + RestCall restCall = restClient.doGet(uriBuilder.build().toString()); + try { + String restResponseEntity = restCall + .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler()) + .getResponseAsString(); + //FollowersListResponse result = restCall.getResponse(FollowersListResponse.class); + FollowersListResponse result = mapper.readValue(restResponseEntity, FollowersListResponse.class); + return result; + } catch (RestCallException e) { + LOGGER.warn("RestCallException", e); + } + }catch (IOException e) { + LOGGER.warn("IOException", e); + } catch (URISyntaxException e) { + LOGGER.warn("URISyntaxException", e); + } + return null; + } + + @Override + public List<User> lookup(UsersLookupRequest parameters) { +// TODO: juneau-6.3.x-incubating +// Users restUsers = restClient.getRemoteableProxy("/users/lookup.json", Users.class); +// List<User> result = restUsers.lookup(parameters); +// return result; + String user_ids = StringUtils.join(parameters.getUserId(), ','); + String screen_names = StringUtils.join(parameters.getScreenName(), ','); + try { + URIBuilder uriBuilder = + new URIBuilder() + .setPath("/users/lookup.json"); + if (Objects.nonNull(parameters.getIncludeEntities()) && StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) { + uriBuilder.addParameter("include_entities", parameters.getIncludeEntities().toString()); + } + if (Objects.nonNull(screen_names) && StringUtils.isNotBlank(screen_names)) { + uriBuilder.addParameter("screen_name", screen_names); + } + if (Objects.nonNull(user_ids) && StringUtils.isNotBlank(user_ids)) { + uriBuilder.addParameter("user_id", user_ids); + } + RestCall restCall = restClient.doGet(uriBuilder.build().toString()); +// List<User> result = restCall.getResponse(LinkedList.class, User.class); + try { + String restResponseEntity = restCall + .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler()) + .getResponseAsString(); + ArrayNode resultArrayNode = mapper.readValue(restResponseEntity, ArrayNode.class); + List<User> result = new ArrayList(); + resultArrayNode.iterator().forEachRemaining(item -> result.add(mapper.convertValue(item, User.class))); + return result; + } catch (RestCallException e) { + LOGGER.warn("RestCallException", e); + } + } catch (IOException e) { + LOGGER.warn("IOException", e); + } catch (URISyntaxException e) { + LOGGER.warn("URISyntaxException", e); + } + return new ArrayList<>(); + } + + @Override + public User show(UsersShowRequest parameters) { +// TODO: juneau-6.3.x-incubating +// Users restUsers = restClient.getRemoteableProxy("/users/lookup.json", Users.class); +// User result = restUsers.show(parameters); +// return result; + try { + URIBuilder uriBuilder = + new URIBuilder() + .setPath("/users/show.json"); + if (Objects.nonNull(parameters.getIncludeEntities()) && StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) { + uriBuilder.addParameter("include_entities", parameters.getIncludeEntities().toString()); + } + if (Objects.nonNull(parameters.getScreenName()) && StringUtils.isNotBlank(parameters.getScreenName())) { + uriBuilder.addParameter("screen_name", parameters.getScreenName()); + } + if (Objects.nonNull(parameters.getUserId()) && StringUtils.isNotBlank(parameters.getUserId().toString())) { + uriBuilder.addParameter("user_id", parameters.getUserId().toString()); + } + RestCall restCall = restClient.doGet(uriBuilder.build().toString()); + try { + String restResponseEntity = restCall + .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler()) + .getResponseAsString(); + User result = mapper.readValue(restResponseEntity, User.class); + return result; + } catch (RestCallException e) { + LOGGER.warn("RestCallException", e); + } + } catch (IOException e) { + LOGGER.warn("IOException", e); + } catch (URISyntaxException e) { + LOGGER.warn("URISyntaxException", e); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java new file mode 100644 index 0000000..24c7b04 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.api; + +import org.apache.streams.twitter.TwitterOAuthConfiguration; + +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpRequestInterceptor; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestWrapper; +import org.apache.http.entity.StringEntity; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.BASE64Encoder; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.security.GeneralSecurityException; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.StringJoiner; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import javax.crypto.Mac; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; + +/** + * Handles request signing to api.twitter.com + */ +public class TwitterOAuthRequestInterceptor implements HttpRequestInterceptor { + + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterOAuthRequestInterceptor.class); + + private static final String oauth_signature_encoding = "UTF-8"; + private static final String oauth_signature_method = "HMAC-SHA1"; + private static final String oauth_version = "1.0"; + + private static final BASE64Encoder base64Encoder = new BASE64Encoder(); + + TwitterOAuthConfiguration oAuthConfiguration; + + public TwitterOAuthRequestInterceptor(TwitterOAuthConfiguration oAuthConfiguration) { + this.oAuthConfiguration = oAuthConfiguration; + } + + @Override + public void process(HttpRequest httpRequest, HttpContext httpContext) throws HttpException, IOException { + + String oauth_nonce = generateNonce(); + + String oauth_timestamp = generateTimestamp(); + + Map<String,String> oauthParamMap = new HashMap<>(); + oauthParamMap.put("oauth_consumer_key", oAuthConfiguration.getConsumerKey()); + oauthParamMap.put("oauth_nonce", oauth_nonce); + oauthParamMap.put("oauth_signature_method", oauth_signature_method); + oauthParamMap.put("oauth_timestamp", oauth_timestamp); + oauthParamMap.put("oauth_token", oAuthConfiguration.getAccessToken()); + oauthParamMap.put("oauth_version", oauth_version); + + String request_host = ((HttpRequestWrapper)httpRequest).getTarget().toString().replace(":443",""); + String request_path = httpRequest.getRequestLine().getUri().substring(0, httpRequest.getRequestLine().getUri().indexOf('?')); + String request_param_line = httpRequest.getRequestLine().getUri().substring(httpRequest.getRequestLine().getUri().indexOf('?')+1); + String[] request_params = URLDecoder.decode(request_param_line).split("&"); + + Map<String,String> allParamsMap = new HashMap<>(oauthParamMap); + + for( String request_param : request_params ) { + String key = request_param.substring(0, request_param.indexOf('=')); + String value = request_param.substring(request_param.indexOf('=')+1, request_param.length()); + allParamsMap.put(key, value); + } + + String[] body_params; + if( ((HttpRequestWrapper) httpRequest).getOriginal() instanceof HttpPost) { + String body = EntityUtils.toString(((HttpPost)((HttpRequestWrapper) httpRequest).getOriginal()).getEntity()); + body_params = body.split(","); + for( String body_param : body_params ) { + String key = body_param.substring(0, body_param.indexOf('=')); + String value = body_param.substring(body_param.indexOf('=')+1, body_param.length()); + allParamsMap.put(key, value); + } + } + + allParamsMap = encodeMap(allParamsMap); + + String signature_parameter_string = generateSignatureParameterString(allParamsMap); + + String signature_base_string = generateSignatureBaseString(((HttpRequestWrapper) httpRequest).getMethod(), request_host+request_path, signature_parameter_string); + + String signing_key = encode(oAuthConfiguration.getConsumerSecret()) + "&" + encode(oAuthConfiguration.getAccessTokenSecret()); + + String oauth_signature; + try { + oauth_signature = computeSignature(signature_base_string, signing_key); + } catch (GeneralSecurityException e) { + LOGGER.warn("GeneralSecurityException", e); + return; + } + + oauthParamMap.put("oauth_signature", oauth_signature); + + String authorization_header_string = generateAuthorizationHeaderString(oauthParamMap); + + httpRequest.setHeader("Authorization", authorization_header_string); + + } + + public String generateTimestamp() { + Calendar tempcal = Calendar.getInstance(); + long ts = tempcal.getTimeInMillis();// get current time in milliseconds + String oauth_timestamp = (new Long(ts/1000)).toString(); + return oauth_timestamp; + } + + public String generateNonce() { + String uuid_string = UUID.randomUUID().toString(); + uuid_string = uuid_string.replaceAll("-", ""); + String oauth_nonce = base64Encoder.encode(uuid_string.getBytes()); + return oauth_nonce; + } + + public static Map<String, String> encodeMap(Map<String, String> map) { + Map<String,String> newMap = new HashMap<>(); + for( String key : map.keySet() ) { + String value = map.get(key); + newMap.put(encode(key), encode(value)); + } + return newMap; + } + + + public static String generateAuthorizationHeaderString(Map<String,String> oauthParamMap) { + SortedSet<String> sortedKeys = new TreeSet<>(oauthParamMap.keySet()); + + StringJoiner stringJoiner = new StringJoiner(", "); + for( String key : sortedKeys ) { + stringJoiner.add(encode(key)+"="+"\""+encode(oauthParamMap.get(key))+"\""); + } + + String authorization_header_string = new StringBuilder() + .append("OAuth ") + .append(stringJoiner.toString()) + .toString(); + return authorization_header_string; + } + + public static String generateSignatureBaseString(String method, String request_url, String signature_parameter_string) { + String signature_base_string = new StringBuilder() + .append(method) + .append("&") + .append(encode(request_url)) + .append("&") + .append(encode(signature_parameter_string)) + .toString(); + return signature_base_string; + } + + public static String generateSignatureParameterString(Map<String, String> allParamsMap) { + + SortedSet<String> sortedKeys = new TreeSet<>(allParamsMap.keySet()); + + StringJoiner stringJoiner = new StringJoiner("&"); + for( String key : sortedKeys ) { + stringJoiner.add(key+"="+allParamsMap.get(key)); + } + + return stringJoiner.toString(); + } + + public static String encode(String value) + { + String encoded = null; + try { + encoded = URLEncoder.encode(value, oauth_signature_encoding); + } catch (UnsupportedEncodingException ignore) { + } + StringBuilder buf = new StringBuilder(encoded.length()); + char focus; + for (int i = 0; i < encoded.length(); i++) { + focus = encoded.charAt(i); + if (focus == '*') { + buf.append("%2A"); + } else if (focus == '+') { + buf.append("%20"); + } else if (focus == '%' && (i + 1) < encoded.length() + && encoded.charAt(i + 1) == '7' && encoded.charAt(i + 2) == 'E') { + buf.append('~'); + i += 2; + } else { + buf.append(focus); + } + } + return buf.toString(); + } + + public static String computeSignature(String baseString, String keyString) throws GeneralSecurityException, UnsupportedEncodingException + { + SecretKey secretKey = null; + + byte[] keyBytes = keyString.getBytes(); + secretKey = new SecretKeySpec(keyBytes, "HmacSHA1"); + + Mac mac = Mac.getInstance("HmacSHA1"); + mac.init(secretKey); + + byte[] text = baseString.getBytes(); + + return new String(base64Encoder.encode(mac.doFinal(text))).trim(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java new file mode 100644 index 0000000..3f7a853 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.api; + +import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy; +import org.apache.streams.util.api.requests.backoff.BackOffException; +import org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy; + +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.protocol.HttpContext; +import org.apache.juneau.rest.client.RetryOn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Handle expected and unexpected exceptions. + */ +public class TwitterRetryHandler implements RetryOn { + + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterRetryHandler.class); + + private static AbstractBackOffStrategy backoff_strategy; + +// TODO: once request context is available, we can maintain multiple BackoffStrategy one per request path / params +// private static Map<String, AbstractBackOffStrategy> backoffs = new ConcurrentHashMap<>(); + +// This is everything we used to check via twitter4j to decide whether to retry. +// +// @Deprecated +// public static int handleTwitterError(Twitter twitter, Exception exception) { +// return handleTwitterError( twitter, null, exception); +// } +// +// +// public static int handleTwitterError(Twitter twitter, Long id, Exception exception) { +// +// if (exception instanceof TwitterException) { +// TwitterException twitterException = (TwitterException)exception; +// +// if (twitterException.exceededRateLimitation()) { +// +// long millisUntilReset = retry; +// +// final RateLimitStatus rateLimitStatus = twitterException.getRateLimitStatus(); +// if (rateLimitStatus != null) { +// millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000; +// } +// +// LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", millisUntilReset / 1000); +// +// try { +// Thread.sleep(millisUntilReset); +// } catch (InterruptedException e1) { +// Thread.currentThread().interrupt(); +// } +// +// return 1; +// } else if (twitterException.isCausedByNetworkIssue()) { +// LOGGER.info("Twitter Network Issues Detected. Backing off..."); +// LOGGER.info("{} - {}", twitterException.getExceptionCode(), twitterException.getLocalizedMessage()); +// try { +// Thread.sleep(retry); +// } catch (InterruptedException e1) { +// Thread.currentThread().interrupt(); +// } +// return 1; +// } else if (twitterException.isErrorMessageAvailable()) { +// if (twitterException.getMessage().toLowerCase().contains("does not exist")) { +// if ( id != null ) { +// LOGGER.warn("User does not exist: {}", id); +// } else { +// LOGGER.warn("User does not exist"); +// } +// return (int)retryMax; +// } else { +// return (int)retryMax / 3; +// } +// } else { +// if (twitterException.getExceptionCode().equals("ced778ef-0c669ac0")) { +// // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data. +// return (int)retryMax / 3; +// } else if (twitterException.getExceptionCode().equals("4be80492-0a7bf7c7")) { +// // This is a 401 reflecting credentials don't have access to the requested resource. +// if ( id != null ) { +// LOGGER.warn("Authentication Exception accessing id: {}", id); +// } else { +// LOGGER.warn("Authentication Exception"); +// } +// return (int)retryMax; +// } else { +// LOGGER.warn("Unknown Twitter Exception..."); +// LOGGER.warn(" Access: {}", twitterException.getAccessLevel()); +// LOGGER.warn(" Code: {}", twitterException.getExceptionCode()); +// LOGGER.warn(" Message: {}", twitterException.getLocalizedMessage()); +// return (int)retryMax / 10; +// } +// } +// } else if (exception instanceof RuntimeException) { +// LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage()); +// return (int)retryMax / 3; +// } else { +// LOGGER.info("Completely Unknown Exception: {}", exception); +// return (int)retryMax / 3; +// } +// } +// TODO: juneau 6.3.x-incubating +// @Override +// public boolean onCode(int httpResponseCode) { +// +// LOGGER.warn("TwitterRetryHandler: {}", httpResponseCode); +// +// if( httpResponseCode > 400 ) { +// return true; +// } else { +// return false; +// } +// +// } + + @Override + public boolean onCode(int httpResponseCode) { +// if( backoff_strategy == null ) { +// backoff_strategy = new LinearTimeBackOffStrategy(retrySleepMs / 1000, retryMax); +// } +// if( httpResponseCode > 400 ) { +// try { +// backoff_strategy.backOff(); +// return true; +// } catch (BackOffException boe) { +// backoff_strategy.reset(); +// return false; +// } +// } else { +// return false; +// } + if( httpResponseCode > 400 ) { + return true; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java new file mode 100644 index 0000000..5de9046 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.api; + +import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.pojo.User; + +import java.util.List; + +/** + * Returns a collection of the most recent Tweets posted by the user indicated by the screen_name or user_id parameters. + */ +public interface Users { + + /** + * Returns fully-hydrated user objects for up to 100 users per request, as specified by comma-separated values passed to the user_id and/or screen_name parameters. + * + * @param parameters {@link org.apache.streams.twitter.api.UsersLookupRequest} + * @return List<Tweet> + * @see <a href="https://dev.twitter.com/rest/reference/get/users/lookup">https://dev.twitter.com/rest/reference/get/users/lookup</a> + * + */ + public List<User> lookup(UsersLookupRequest parameters); + + /** + * Returns a variety of information about the user specified by the required user_id or screen_name parameter. The authorâs most recent Tweet will be returned inline when possible. + * + * @param parameters {@link org.apache.streams.twitter.api.UsersShowRequest} + * @return List<Tweet> + * @see <a href="https://dev.twitter.com/rest/reference/get/users/show">https://dev.twitter.com/rest/reference/get/users/show</a> + * + */ + public User show(UsersShowRequest parameters); + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java index 6392e0d..6db16be 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java @@ -126,7 +126,7 @@ public class TwitterActivityUtil { } /** - * Builds the activity {@link ActivityObject} actor from the tweet + * Builds the activity {@link ActivityObject} actor from the tweet. * @param tweet the object to use as the source * @return a valid Actor populated from the Tweet */ @@ -138,7 +138,7 @@ public class TwitterActivityUtil { } /** - * Builds the activity {@link ActivityObject} actor from the User + * Builds the activity {@link ActivityObject} actor from the User. * @param user the object to use as the source * @return a valid Actor populated from the Tweet */ @@ -186,7 +186,7 @@ public class TwitterActivityUtil { } /** - * Creates an {@link ActivityObject} for the tweet + * Creates an {@link ActivityObject} for the tweet. * @param tweet the object to use as the source * @return a valid ActivityObject */ http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java index 89a5abd..a170626 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java @@ -28,23 +28,20 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.TwitterConfiguration; import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.api.StatusesShowRequest; +import org.apache.streams.twitter.api.Twitter; import org.apache.streams.twitter.converter.TwitterDocumentClassifier; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.provider.TwitterProviderUtil; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import twitter4j.Status; -import twitter4j.Twitter; -import twitter4j.TwitterException; -import twitter4j.TwitterFactory; -import twitter4j.TwitterObjectFactory; -import twitter4j.conf.ConfigurationBuilder; +import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -89,7 +86,13 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor { Activity doc = (Activity)entry.getDocument(); String originalId = doc.getId(); if (PROVIDER_ID.equals(doc.getProvider().getId())) { - fetchAndReplace(doc, originalId); + try { + fetchAndReplace(doc, originalId); + } catch (ActivityConversionException ex) { + LOGGER.warn("ActivityConversionException", ex); + } catch (IOException ex) { + LOGGER.warn("IOException", ex); + } } } else { throw new IllegalStateException("Requires an activity document"); @@ -100,8 +103,14 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor { @Override public void prepare(Object configurationObject) { - this.client = getTwitterClient(); + try { + client = getTwitterClient(); + } catch (InstantiationException e) { + LOGGER.error("InstantiationException", e); + } + Objects.requireNonNull(client); this.mapper = StreamsJacksonMapper.getInstance(); + Objects.requireNonNull(mapper); } @Override @@ -109,22 +118,14 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor { } - protected void fetchAndReplace(Activity doc, String originalId) { - try { - String json = fetch(doc); - replace(doc, json); - doc.setId(originalId); - retryCount = 0; - } catch (TwitterException tw) { - if (tw.exceededRateLimitation()) { - sleepAndTryAgain(doc, originalId); - } - } catch (Exception ex) { - LOGGER.warn("Error fetching and replacing tweet for activity {}", doc.getId()); - } + protected void fetchAndReplace(Activity doc, String originalId) throws java.io.IOException, ActivityConversionException { + Tweet tweet = fetch(doc); + replace(doc, tweet); + doc.setId(originalId); } - protected void replace(Activity doc, String json) throws java.io.IOException, ActivityConversionException { + protected void replace(Activity doc, Tweet tweet) throws java.io.IOException, ActivityConversionException { + String json = mapper.writeValueAsString(tweet); Class documentSubType = new TwitterDocumentClassifier().detectClasses(json).get(0); Object object = mapper.readValue(json, documentSubType); @@ -137,52 +138,38 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor { } } - protected String fetch(Activity doc) throws TwitterException { + protected Tweet fetch(Activity doc) { String id = doc.getObject().getId(); LOGGER.debug("Fetching status from Twitter for {}", id); Long tweetId = Long.valueOf(id.replace("id:twitter:tweets:", "")); - Status status = getTwitterClient().showStatus(tweetId); - return TwitterObjectFactory.getRawJSON(status); + Tweet tweet = client.show( + new StatusesShowRequest() + .withId(tweetId) + ); + return tweet; } - protected Twitter getTwitterClient() { - - if (this.client == null) { + protected Twitter getTwitterClient() throws InstantiationException { - String baseUrl = TwitterProviderUtil.baseUrl(config); + return Twitter.getInstance(config); - ConfigurationBuilder builder = new ConfigurationBuilder() - .setOAuthConsumerKey(config.getOauth().getConsumerKey()) - .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) - .setOAuthAccessToken(config.getOauth().getAccessToken()) - .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) - .setIncludeEntitiesEnabled(true) - .setJSONStoreEnabled(true) - .setAsyncNumThreads(1) - .setRestBaseURL(baseUrl) - .setIncludeMyRetweetEnabled(Boolean.TRUE) - .setPrettyDebugEnabled(Boolean.TRUE); - - this.client = new TwitterFactory(builder.build()).getInstance(); - } - return this.client; } //Hardcore sleep to allow for catch up - protected void sleepAndTryAgain(Activity doc, String originalId) { - try { - //Attempt to fetchAndReplace with a backoff up to the limit then just reset the count and let the process continue - if (retryCount < MAX_ATTEMPTS) { - retryCount++; - LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (retryCount * 4)); - Thread.sleep(BACKOFF * retryCount); - fetchAndReplace(doc, originalId); - } else { - retryCount = 0; - } - } catch (InterruptedException ex) { - LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff"); - } - } +// protected void sleepAndTryAgain(Activity doc, String originalId) { +// try { +// //Attempt to fetchAndReplace with a backoff up to the limit then just reset the count and let the process continue +// if (retryCount < MAX_ATTEMPTS) { +// retryCount++; +// LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (retryCount * 4)); +// Thread.sleep(BACKOFF * retryCount); +// fetchAndReplace(doc, originalId); +// } else { +// retryCount = 0; +// } +// } catch (InterruptedException ex) { +// LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff"); +// } +// } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java deleted file mode 100644 index ec43fba..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.provider; - -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.twitter.TwitterConfiguration; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import twitter4j.RateLimitStatus; -import twitter4j.Twitter; -import twitter4j.TwitterException; - -/** - * Handle expected and unexpected exceptions. - */ -public class TwitterErrorHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class); - - // selected because 3 * 5 + n >= 15 for positive n - protected static long retry = - new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration( - StreamsConfigurator.getConfig().getConfig("twitter") - ).getRetrySleepMs(); - protected static long retryMax = - new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration( - StreamsConfigurator.getConfig().getConfig("twitter") - ).getRetryMax(); - - @Deprecated - public static int handleTwitterError(Twitter twitter, Exception exception) { - return handleTwitterError( twitter, null, exception); - } - - /** - * handleTwitterError. - * @param twitter Twitter - * @param id id - * @param exception exception - * @return - */ - public static int handleTwitterError(Twitter twitter, Long id, Exception exception) { - - if (exception instanceof TwitterException) { - TwitterException twitterException = (TwitterException)exception; - - if (twitterException.exceededRateLimitation()) { - - long millisUntilReset = retry; - - final RateLimitStatus rateLimitStatus = twitterException.getRateLimitStatus(); - if (rateLimitStatus != null) { - millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000; - } - - LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", millisUntilReset / 1000); - - try { - Thread.sleep(millisUntilReset); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - } - - return 1; - } else if (twitterException.isCausedByNetworkIssue()) { - LOGGER.info("Twitter Network Issues Detected. Backing off..."); - LOGGER.info("{} - {}", twitterException.getExceptionCode(), twitterException.getLocalizedMessage()); - try { - Thread.sleep(retry); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - } - return 1; - } else if (twitterException.isErrorMessageAvailable()) { - if (twitterException.getMessage().toLowerCase().contains("does not exist")) { - if ( id != null ) { - LOGGER.warn("User does not exist: {}", id); - } else { - LOGGER.warn("User does not exist"); - } - return (int)retryMax; - } else { - return (int)retryMax / 3; - } - } else { - if (twitterException.getExceptionCode().equals("ced778ef-0c669ac0")) { - // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data. - return (int)retryMax / 3; - } else if (twitterException.getExceptionCode().equals("4be80492-0a7bf7c7")) { - // This is a 401 reflecting credentials don't have access to the requested resource. - if ( id != null ) { - LOGGER.warn("Authentication Exception accessing id: {}", id); - } else { - LOGGER.warn("Authentication Exception"); - } - return (int)retryMax; - } else { - LOGGER.warn("Unknown Twitter Exception..."); - LOGGER.warn(" Account: {}", twitter); - LOGGER.warn(" Access: {}", twitterException.getAccessLevel()); - LOGGER.warn(" Code: {}", twitterException.getExceptionCode()); - LOGGER.warn(" Message: {}", twitterException.getLocalizedMessage()); - return (int)retryMax / 10; - } - } - } else if (exception instanceof RuntimeException) { - LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage()); - return (int)retryMax / 3; - } else { - LOGGER.info("Completely Unknown Exception: {}", exception); - return (int)retryMax / 3; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java new file mode 100644 index 0000000..b9448e6 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.provider; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.api.FollowersIdsRequest; +import org.apache.streams.twitter.api.FollowersIdsResponse; +import org.apache.streams.twitter.api.Twitter; +import org.apache.streams.twitter.pojo.Follow; +import org.apache.streams.twitter.pojo.User; +import org.apache.streams.util.ComponentUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retrieve friend or follower connections for a single user id. + */ +public class TwitterFollowersIdsProviderTask implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersIdsProviderTask.class); + + private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + protected Twitter client; + protected TwitterFollowingProvider provider; + protected FollowersIdsRequest request; + + private int count = 0; + + /** + * TwitterFollowingProviderTask constructor. + * @param provider TwitterFollowingProvider + * @param twitter Twitter + * @param request FollowersIdsRequest + */ + public TwitterFollowersIdsProviderTask(TwitterFollowingProvider provider, Twitter twitter, FollowersIdsRequest request) { + this.provider = provider; + this.client = twitter; + this.request = request; + } + + @Override + public void run() { + + Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null); + + LOGGER.info("Thread Starting: {}", request.toString()); + + getFollowersIds(request); + + LOGGER.info("Thread Finished: {}", request.toString()); + + } + + int last_count = 0; + int page_count = 1; + int item_count = 0; + long cursor = 0; + + private void getFollowersIds(FollowersIdsRequest request) { + + do { + + FollowersIdsResponse response = client.ids(request); + + last_count = response.getIds().size(); + + if (response.getIds().size() > 0) { + + for (Long id : response.getIds()) { + + Follow follow = new Follow() + .withFollowee( + new User() + .withId(request.getId()) + .withScreenName(request.getScreenName())) + .withFollower( + new User() + .withId(id)); + + if (item_count < provider.getConfig().getMaxItems()) { + ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue); + item_count++; + } + + } + + } + page_count++; + cursor = response.getNextCursor(); + request.setCursor(cursor); + } + while (shouldContinuePulling(cursor, last_count, page_count, item_count)); + } + + public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) { + return ( + cursor > 0 + && count > 0 + && item_count < provider.getConfig().getMaxItems() + && page_count <= provider.getConfig().getMaxPages()); + } + +}