Repository: incubator-streams Updated Branches: refs/heads/master 104f29b1e -> dd58c877b
start level-up gplus providers Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b7ee7d4d Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b7ee7d4d Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b7ee7d4d Branch: refs/heads/master Commit: b7ee7d4d9845d15685ae6b9e5b8a332bbc18f891 Parents: 3234cdb Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Fri Oct 14 09:33:44 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Fri Oct 14 09:33:44 2016 -0500 ---------------------------------------------------------------------- .../google-gplus/pom.xml | 9 ++- .../provider/GPlusUserActivityProvider.java | 56 ++++++++++++++++++ .../gplus/provider/GPlusUserDataProvider.java | 60 ++++++++++++++++++++ .../providers/GPlusUserActivityProviderIT.java | 52 +++++++++++++++++ .../test/providers/GPlusUserDataProviderIT.java | 52 +++++++++++++++++ .../resources/GPlusUserActivityProviderIT.conf | 22 +++++++ .../test/resources/GPlusUserDataProviderIT.conf | 22 +++++++ 7 files changed, 270 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/pom.xml b/streams-contrib/streams-provider-google/google-gplus/pom.xml index e99fb5f..1856ff9 100644 --- a/streams-contrib/streams-provider-google/google-gplus/pom.xml +++ b/streams-contrib/streams-provider-google/google-gplus/pom.xml @@ -30,8 +30,11 @@ <description>Google+ Provider</description> <properties> + <google.client.version>1.22.0</google.client.version> + <gplus.client.version>v1-rev457-1.22.0</gplus.client.version> <skipITs>true</skipITs> <testDataBaseURl>http://streams.peoplepattern.com.s3.amazonaws.com/test-data/</testDataBaseURl> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <repositories> @@ -94,17 +97,17 @@ <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-plus</artifactId> - <version>v1-rev184-1.19.0</version> + <version>${gplus.client.version}</version> </dependency> <dependency> <groupId>com.google.api-client</groupId> <artifactId>google-api-client</artifactId> - <version>1.17.0-rc</version> + <version>${google.client.version}</version> </dependency> <dependency> <groupId>com.google.http-client</groupId> <artifactId>google-http-client-jackson2</artifactId> - <version>1.17.0-rc</version> + <version>${google.client.version}</version> <exclusions> <exclusion> <groupId>commons-logging</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java index 0ab75e6..c890cc1 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java @@ -19,12 +19,27 @@ package com.google.gplus.provider; import com.google.api.services.plus.Plus; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import com.google.gson.Gson; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.google.gplus.GPlusConfiguration; import org.apache.streams.google.gplus.configuration.UserInfo; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Iterator; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; /** * @@ -50,4 +65,45 @@ public class GPlusUserActivityProvider extends AbstractGPlusProvider{ protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) { return new GPlusUserActivityCollector(plus, queue, strategy, userInfo); } + + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus"); + GPlusUserActivityProvider provider = new GPlusUserActivityProvider(config); + + Gson gson = new Gson(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + if (datum.getDocument() instanceof String) + json = (String) datum.getDocument(); + else + json = gson.toJson(datum.getDocument()); + json = gson.toJson(datum.getDocument()); + outStream.println(json); + } + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java index 2effdea..e264318 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java @@ -18,13 +18,32 @@ package com.google.gplus.provider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.services.plus.Plus; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; +import com.google.gson.Gson; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.google.gplus.GPlusConfiguration; import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Iterator; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; /** * @@ -50,4 +69,45 @@ public class GPlusUserDataProvider extends AbstractGPlusProvider{ protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) { return new GPlusUserDataCollector(plus, strategy, queue, userInfo); } + + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus"); + GPlusUserDataProvider provider = new GPlusUserDataProvider(config); + + Gson gson = new Gson(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + if (datum.getDocument() instanceof String) + json = (String) datum.getDocument(); + else + json = gson.toJson(datum.getDocument()); + json = gson.toJson(datum.getDocument()); + outStream.println(json); + } + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java new file mode 100644 index 0000000..9fc470e --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.gplus.test.providers; + +import com.google.common.collect.Lists; +import com.google.gplus.provider.GPlusUserActivityProvider; +import org.junit.Test; + +import java.io.File; +import java.io.FileReader; +import java.io.LineNumberReader; + +public class GPlusUserActivityProviderIT { + + @Test + public void testGPlusUserActivityProvider() throws Exception { + + String configfile = "./target/test-classes/GPlusUserActivityProviderIT.conf"; + String outfile = "./target/test-classes/GPlusUserActivityProviderIT.stdout.txt"; + + GPlusUserActivityProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); + + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); + + while(outCounter.readLine() != null) {} + + assert (outCounter.getLineNumber() >= 1); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java new file mode 100644 index 0000000..574dcc1 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserDataProviderIT.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.gplus.test.providers; + +import com.google.common.collect.Lists; +import com.google.gplus.provider.GPlusUserDataProvider; +import org.junit.Test; + +import java.io.File; +import java.io.FileReader; +import java.io.LineNumberReader; + +public class GPlusUserDataProviderIT { + + @Test + public void testGPlusUserDataProvider() throws Exception { + + String configfile = "./target/test-classes/GPlusUserDataProviderIT.conf"; + String outfile = "./target/test-classes/GPlusUserDataProviderIT.stdout.txt"; + + GPlusUserDataProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); + + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); + + while(outCounter.readLine() != null) {} + + assert (outCounter.getLineNumber() > 1); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf new file mode 100644 index 0000000..6c8ecc1 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserActivityProviderIT.conf @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +gplus { + gplusUsers = [ + { + userId = "+apache" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7ee7d4d/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf new file mode 100644 index 0000000..6c8ecc1 --- /dev/null +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/resources/GPlusUserDataProviderIT.conf @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +gplus { + gplusUsers = [ + { + userId = "+apache" + } + ] +} \ No newline at end of file
