http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java index 5cc6fe7..e11628f 100644 --- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java @@ -18,6 +18,13 @@ package com.google.gmail.provider; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; + import com.google.common.base.Preconditions; import com.google.gmail.GMailConfiguration; import com.googlecode.gmail4j.GmailClient; @@ -26,12 +33,7 @@ import com.googlecode.gmail4j.http.HttpGmailConnection; import com.googlecode.gmail4j.javamail.ImapGmailClient; import com.googlecode.gmail4j.javamail.ImapGmailConnection; import com.googlecode.gmail4j.rss.RssGmailClient; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,141 +52,141 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** - * Created by sblackmon on 12/10/13. + * GMailProvider collects messages from GMail. */ public class GMailProvider implements StreamsProvider, Serializable { - public final static String STREAMS_ID = "GMailProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(GMailProvider.class); + public final static String STREAMS_ID = "GMailProvider"; - private GMailConfiguration config; + private final static Logger LOGGER = LoggerFactory.getLogger(GMailProvider.class); - private Class klass; + private GMailConfiguration config; - public GMailConfiguration getConfig() { - return config; - } + private Class klass; - public void setConfig(GMailConfiguration config) { - this.config = config; - } + public GMailConfiguration getConfig() { + return config; + } - protected BlockingQueue inQueue = new LinkedBlockingQueue<>(10000); + public void setConfig(GMailConfiguration config) { + this.config = config; + } - protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); - protected Future task; + protected BlockingQueue inQueue = new LinkedBlockingQueue<>(10000); - public BlockingQueue<Object> getInQueue() { - return inQueue; - } + protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); + protected Future task; - protected GmailClient rssClient; - protected ImapGmailClient imapClient; + public BlockingQueue<Object> getInQueue() { + return inQueue; + } - private ExecutorService executor; + protected GmailClient rssClient; + protected ImapGmailClient imapClient; - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } + private ExecutorService executor; - public GMailProvider() { - this.config = new ComponentConfigurator<>(GMailConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail")); - } + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } - public GMailProvider(GMailConfiguration config) { - this.config = config; - } + public GMailProvider() { + this.config = new ComponentConfigurator<>(GMailConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail")); + } - public GMailProvider(Class klass) { - this.config = new ComponentConfigurator<>(GMailConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail")); - this.klass = klass; - } + public GMailProvider(GMailConfiguration config) { + this.config = config; + } - public GMailProvider(GMailConfiguration config, Class klass) { - this.config = config; - this.klass = klass; - } + public GMailProvider(Class klass) { + this.config = new ComponentConfigurator<>(GMailConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail")); + this.klass = klass; + } - protected DatumStatusCounter countersTotal = new DatumStatusCounter(); - protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); + public GMailProvider(GMailConfiguration config, Class klass) { + this.config = config; + this.klass = klass; + } - @Override - public String getId() { - return "GMailProvider"; - } + protected DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); - @Override - public void startStream() { + @Override + public String getId() { + return "GMailProvider"; + } - task = executor.submit(new GMailImapProviderTask(this)); + @Override + public void startStream() { - } + task = executor.submit(new GMailImapProviderTask(this)); - @Override - public StreamsResultSet readCurrent() { + } - StreamsResultSet current; + @Override + public StreamsResultSet readCurrent() { - synchronized( GMailProvider.class ) { - current = new StreamsResultSet(new ConcurrentLinkedQueue<>(providerQueue)); - current.setCounter(new DatumStatusCounter()); - current.getCounter().add(countersCurrent); - countersTotal.add(countersCurrent); - countersCurrent = new DatumStatusCounter(); - providerQueue.clear(); - } + StreamsResultSet current; - return current; + synchronized( GMailProvider.class ) { + current = new StreamsResultSet(new ConcurrentLinkedQueue<>(providerQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + providerQueue.clear(); } - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } + return current; + } - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } - @Override - public boolean isRunning() { - return !task.isDone() && !task.isCancelled(); - } + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } - @Override - public void prepare(Object configurationObject) { + @Override + public boolean isRunning() { + return !task.isDone() && !task.isCancelled(); + } - Preconditions.checkNotNull(this.klass); + @Override + public void prepare(Object configurationObject) { - Preconditions.checkNotNull(config.getUserName()); - Preconditions.checkNotNull(config.getPassword()); + Preconditions.checkNotNull(this.klass); - rssClient = new RssGmailClient(); - GmailConnection rssConnection = new HttpGmailConnection(config.getUserName(), config.getPassword().toCharArray()); - rssClient.setConnection(rssConnection); + Preconditions.checkNotNull(config.getUserName()); + Preconditions.checkNotNull(config.getPassword()); - imapClient = new ImapGmailClient(); - GmailConnection imapConnection = new ImapGmailConnection(); - imapConnection.setLoginCredentials(config.getUserName(), config.getPassword().toCharArray()); - imapClient.setConnection(imapConnection); + rssClient = new RssGmailClient(); + GmailConnection rssConnection = new HttpGmailConnection(config.getUserName(), config.getPassword().toCharArray()); + rssClient.setConnection(rssConnection); - executor = Executors.newSingleThreadExecutor(); + imapClient = new ImapGmailClient(); + GmailConnection imapConnection = new ImapGmailConnection(); + imapConnection.setLoginCredentials(config.getUserName(), config.getPassword().toCharArray()); + imapClient.setConnection(imapConnection); - startStream(); - } + executor = Executors.newSingleThreadExecutor(); + + startStream(); + } - @Override - public void cleanUp() { - try { - executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } + @Override + public void cleanUp() { + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); } + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java deleted file mode 100644 index 6fbfd83..0000000 --- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gmail.provider; - -import com.googlecode.gmail4j.GmailMessage; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.util.ComponentUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * Created by sblackmon on 12/10/13. - */ -public class GMailRssProviderTask implements Runnable { - - private final static Logger LOGGER = LoggerFactory.getLogger(GMailRssProviderTask.class); - - private GMailProvider provider; - - public GMailRssProviderTask(GMailProvider provider) { - this.provider = provider; - } - - @Override - public void run() { - - final List<GmailMessage> messages = this.provider.rssClient.getUnreadMessages(); - for (GmailMessage message : messages) { - - StreamsDatum entry = new StreamsDatum(message); - - ComponentUtils.offerUntilSuccess(entry, this.provider.providerQueue); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java index 13fa25a..2da9e82 100644 --- a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java +++ b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java @@ -20,6 +20,7 @@ package com.google.gmail.test; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; + import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -31,37 +32,37 @@ import java.io.InputStream; import java.io.InputStreamReader; /** - * Tests conversion of gplus inputs to Activity + * Tests conversion of gmail inputs to Activity */ @Ignore("ignore until test resources are available.") public class GMailMessageSerDeTest { - private final static Logger LOGGER = LoggerFactory.getLogger(GMailMessageSerDeTest.class); + private final static Logger LOGGER = LoggerFactory.getLogger(GMailMessageSerDeTest.class); - private ObjectMapper mapper = new ObjectMapper(); + private ObjectMapper mapper = new ObjectMapper(); - @Ignore - @Test - public void Tests() - { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + @Ignore + @Test + public void Tests() + { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - InputStream is = GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt"); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); + InputStream is = GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); - try { - while (br.ready()) { - String line = br.readLine(); - LOGGER.debug(line); + try { + while (br.ready()) { + String line = br.readLine(); + LOGGER.debug(line); - // implement - } - } catch( Exception e ) { - e.printStackTrace(); - Assert.fail(); - } + // implement + } + } catch( Exception e ) { + e.printStackTrace(); + Assert.fail(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java index d926541..833fe23 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java @@ -19,71 +19,78 @@ package com.google.gplus.processor; -import com.google.api.services.plus.model.Comment; -import com.google.gplus.serializer.util.GooglePlusActivityUtil; -import java.util.ArrayList; -import java.util.List; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.pojo.json.Activity; + +import com.google.api.services.plus.model.Comment; +import com.google.gplus.serializer.util.GooglePlusActivityUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + +/** + * GooglePlusCommentProcessor collects comments about a google plus activity. + */ public class GooglePlusCommentProcessor implements StreamsProcessor { - private final static String STREAMS_ID = "GooglePlusCommentProcessor"; - private final static Logger LOGGER = LoggerFactory.getLogger(GooglePlusCommentProcessor.class); - private GooglePlusActivityUtil googlePlusActivityUtil; - private int count; - - @Override - public String getId() { - return STREAMS_ID; - } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - StreamsDatum result = null; - - try { - Object item = entry.getDocument(); - LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - - //Get G+ activity ID from our own activity ID - if (item instanceof Activity) { - Activity activity = (Activity) item; - String activityId = getGPlusID(activity.getId()); - - //Call Google Plus API to get list of comments for this activity ID - /* TODO: FILL ME OUT WITH THE API CALL **/ - List<Comment> comments = new ArrayList<>(); - - googlePlusActivityUtil.updateActivity(comments, activity); - result = new StreamsDatum(activity); - } - } catch (Exception e) { - e.printStackTrace(); - LOGGER.error("Exception while converting Comment to Activity: {}", e.getMessage()); - } - - if( result != null ) - return com.google.common.collect.Lists.newArrayList(result); - else - return new ArrayList<>(); - } + private static final String STREAMS_ID = "GooglePlusCommentProcessor"; + private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusCommentProcessor.class); + private GooglePlusActivityUtil googlePlusActivityUtil; + private int count; - @Override - public void prepare(Object configurationObject) { - googlePlusActivityUtil = new GooglePlusActivityUtil(); - count = 0; - } + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + StreamsDatum result = null; + + try { + Object item = entry.getDocument(); + LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - @Override - public void cleanUp() { + //Get G+ activity ID from our own activity ID + if (item instanceof Activity) { + Activity activity = (Activity) item; + String activityId = getGPlusID(activity.getId()); + //Call Google Plus API to get list of comments for this activity ID + /* TODO: FILL ME OUT WITH THE API CALL **/ + List<Comment> comments = new ArrayList<>(); + + googlePlusActivityUtil.updateActivity(comments, activity); + result = new StreamsDatum(activity); + } + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.error("Exception while converting Comment to Activity: {}", ex.getMessage()); } - private String getGPlusID(String activityID) { - String[] activityParts = activityID.split(":"); - return (activityParts.length > 0) ? activityParts[activityParts.length - 1] : ""; + if ( result != null ) { + return com.google.common.collect.Lists.newArrayList(result); + } else { + return new ArrayList<>(); } + } + + @Override + public void prepare(Object configurationObject) { + googlePlusActivityUtil = new GooglePlusActivityUtil(); + count = 0; + } + + @Override + public void cleanUp() { + + } + + private String getGPlusID(String activityId) { + String[] activityParts = activityId.split(":"); + return (activityParts.length > 0) ? activityParts[activityParts.length - 1] : ""; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java index d44a487..fe4d5da 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java @@ -18,7 +18,11 @@ package com.google.gplus.processor; -import com.fasterxml.jackson.databind.JsonNode; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; + import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.api.services.plus.model.Person; import com.google.common.collect.Lists; @@ -26,112 +30,112 @@ import com.google.gplus.serializer.util.GPlusActivityDeserializer; import com.google.gplus.serializer.util.GPlusEventClassifier; import com.google.gplus.serializer.util.GPlusPersonDeserializer; import com.google.gplus.serializer.util.GooglePlusActivityUtil; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.node.ObjectNode; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Queue; +/** + * GooglePlusTypeConverter is a StreamsProcessor that converts gplus activities to activitystreams activities. + */ public class GooglePlusTypeConverter implements StreamsProcessor { - public final static String STREAMS_ID = "GooglePlusTypeConverter"; - private final static Logger LOGGER = LoggerFactory.getLogger(GooglePlusTypeConverter.class); - private StreamsJacksonMapper mapper; - private Queue<Person> inQueue; - private Queue<StreamsDatum> outQueue; - private GooglePlusActivityUtil googlePlusActivityUtil; - private int count = 0; - - public GooglePlusTypeConverter() {} - - public Queue<StreamsDatum> getProcessorOutputQueue() { - return outQueue; + public static final String STREAMS_ID = "GooglePlusTypeConverter"; + + private static final Logger LOGGER = LoggerFactory.getLogger(GooglePlusTypeConverter.class); + private StreamsJacksonMapper mapper; + private Queue<Person> inQueue; + private Queue<StreamsDatum> outQueue; + private GooglePlusActivityUtil googlePlusActivityUtil; + private int count = 0; + + public GooglePlusTypeConverter() {} + + public Queue<StreamsDatum> getProcessorOutputQueue() { + return outQueue; + } + + public void setProcessorInputQueue(Queue<Person> inputQueue) { + inQueue = inputQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + StreamsDatum result = null; + + try { + Object item = entry.getDocument(); + + LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); + Activity activity = null; + + if (item instanceof String) { + item = deserializeItem(item); + } + + if (item instanceof Person) { + activity = new Activity(); + googlePlusActivityUtil.updateActivity((Person)item, activity); + } else if (item instanceof com.google.api.services.plus.model.Activity) { + activity = new Activity(); + googlePlusActivityUtil.updateActivity((com.google.api.services.plus.model.Activity)item, activity); + } + + if (activity != null) { + result = new StreamsDatum(activity); + count++; + } + } catch (Exception ex) { + ex.printStackTrace(); + LOGGER.error("Exception while converting Person to Activity: {}", ex.getMessage()); } - public void setProcessorInputQueue(Queue<Person> inputQueue) { - inQueue = inputQueue; + if ( result != null ) { + return Lists.newArrayList(result); + } else { + return Lists.newArrayList(); } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - StreamsDatum result = null; - - try { - Object item = entry.getDocument(); - - LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - Activity activity = null; - - if(item instanceof String) { - item = deserializeItem(item); - } - - if(item instanceof Person) { - activity = new Activity(); - googlePlusActivityUtil.updateActivity((Person)item, activity); - } else if(item instanceof com.google.api.services.plus.model.Activity) { - activity = new Activity(); - googlePlusActivityUtil.updateActivity((com.google.api.services.plus.model.Activity)item, activity); - } - - if(activity != null) { - result = new StreamsDatum(activity); - count++; - } - } catch (Exception e) { - e.printStackTrace(); - LOGGER.error("Exception while converting Person to Activity: {}", e.getMessage()); - } - - if( result != null ) - return Lists.newArrayList(result); - else - return Lists.newArrayList(); + } + + private Object deserializeItem(Object item) { + try { + Class klass = GPlusEventClassifier.detectClass((String) item); + + if (klass.equals(Person.class)) { + item = mapper.readValue((String) item, Person.class); + } else if (klass.equals(com.google.api.services.plus.model.Activity.class)) { + item = mapper.readValue((String) item, com.google.api.services.plus.model.Activity.class); + } + } catch (Exception ex) { + LOGGER.error("Exception while trying to deserializeItem: {}", ex); } - private Object deserializeItem(Object item) { - try { - Class klass = GPlusEventClassifier.detectClass((String) item); + return item; + } - if (klass.equals(Person.class)) { - item = mapper.readValue((String) item, Person.class); - } else if (klass.equals(com.google.api.services.plus.model.Activity.class)) { - item = mapper.readValue((String) item, com.google.api.services.plus.model.Activity.class); - } - } catch (Exception e) { - LOGGER.error("Exception while trying to deserializeItem: {}", e); - } + @Override + public void prepare(Object configurationObject) { + googlePlusActivityUtil = new GooglePlusActivityUtil(); + mapper = StreamsJacksonMapper.getInstance(); - return item; - } - - @Override - public void prepare(Object configurationObject) { - googlePlusActivityUtil = new GooglePlusActivityUtil(); - mapper = StreamsJacksonMapper.getInstance(); + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer()); + mapper.registerModule(simpleModule); - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer()); - mapper.registerModule(simpleModule); + simpleModule = new SimpleModule(); + simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, new GPlusActivityDeserializer()); + mapper.registerModule(simpleModule); + } - simpleModule = new SimpleModule(); - simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, new GPlusActivityDeserializer()); - mapper.registerModule(simpleModule); - } - - @Override - public void cleanUp() { - //No-op - } + @Override + public void cleanUp() { + //No-op + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java index 734e711..e08c571 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java @@ -18,6 +18,17 @@ package com.google.gplus.provider; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.google.gplus.GPlusConfiguration; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.util.ComponentUtils; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; + import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets; import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; import com.google.api.client.http.HttpTransport; @@ -31,16 +42,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.google.gplus.GPlusConfiguration; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.util.ComponentUtils; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; -import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,198 +63,202 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; /** - * Provider that creates a GPlus client and will run task that queue data to an outing queue + * Provider that creates a GPlus client and will run task that queue data to an outing queue. */ public abstract class AbstractGPlusProvider implements StreamsProvider { - public final static String STREAMS_ID = "AbstractGPlusProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(AbstractGPlusProvider.class); - private final static Set<String> SCOPE = new HashSet<String>() {{ add("https://www.googleapis.com/auth/plus.login");}}; - private final static int MAX_BATCH_SIZE = 1000; + public static final String STREAMS_ID = "AbstractGPlusProvider"; - private static final HttpTransport TRANSPORT = new NetHttpTransport(); - private static final JacksonFactory JSON_FACTORY = new JacksonFactory(); - private static final Gson GSON = new Gson(); - - private GPlusConfiguration config; - - List<ListenableFuture<Object>> futures = new ArrayList<>(); - - private ListeningExecutorService executor; - - private BlockingQueue<StreamsDatum> datumQueue; - private BlockingQueue<Runnable> runnables; - private AtomicBoolean isComplete; - private boolean previousPullWasEmpty; - - protected GoogleClientSecrets clientSecrets; - protected GoogleCredential credential; - protected Plus plus; - - public AbstractGPlusProvider() { - this.config = new ComponentConfigurator<>(GPlusConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gplus")); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGPlusProvider.class); + private static final Set<String> SCOPE = new HashSet<String>() { + { + add("https://www.googleapis.com/auth/plus.login"); } + }; + private static final int MAX_BATCH_SIZE = 1000; - public AbstractGPlusProvider(GPlusConfiguration config) { - this.config = config; - } + private static final HttpTransport TRANSPORT = new NetHttpTransport(); + private static final JacksonFactory JSON_FACTORY = new JacksonFactory(); + private static final Gson GSON = new Gson(); - @Override - public void prepare(Object configurationObject) { - - Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile()); - Preconditions.checkNotNull(config.getOauth().getAppName()); - Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress()); - - try { - this.plus = createPlusClient(); - } catch (IOException|GeneralSecurityException e) { - LOGGER.error("Failed to created oauth for GPlus : {}", e); - throw new RuntimeException(e); - } - // GPlus rate limits you to 5 calls per second, so there is not a need to execute more than one - // collector unless you have multiple oauth tokens - //TODO make this configurable based on the number of oauth tokens - this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); - this.datumQueue = new LinkedBlockingQueue<>(1000); - this.isComplete = new AtomicBoolean(false); - this.previousPullWasEmpty = false; - } + private GPlusConfiguration config; - @Override - public void startStream() { - - BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2); - for(UserInfo user : this.config.getGooglePlusUsers()) { - if(this.config.getDefaultAfterDate() != null && user.getAfterDate() == null) { - user.setAfterDate(this.config.getDefaultAfterDate()); - } - if(this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) { - user.setBeforeDate(this.config.getDefaultBeforeDate()); - } - this.executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.plus, user)); - } - this.executor.shutdown(); - } + List<ListenableFuture<Object>> futures = new ArrayList<>(); - protected abstract Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo); - - @Override - public String getId() { - return STREAMS_ID; - } + private ListeningExecutorService executor; - @Override - public StreamsResultSet readCurrent() { - BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>(); - int batchCount = 0; - while(!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) { - StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue); - if(datum != null) { - ++batchCount; - ComponentUtils.offerUntilSuccess(datum, batch); - } - } - boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() &&this.executor.isTerminated(); - this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty); - this.previousPullWasEmpty = pullIsEmpty; - return new StreamsResultSet(batch); - } + private BlockingQueue<StreamsDatum> datumQueue; + private BlockingQueue<Runnable> runnables; + private AtomicBoolean isComplete; + private boolean previousPullWasEmpty; - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } + protected GoogleClientSecrets clientSecrets; + protected GoogleCredential credential; + protected Plus plus; - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } + public AbstractGPlusProvider() { + this.config = new ComponentConfigurator<>(GPlusConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gplus")); + } - @VisibleForTesting - protected Plus createPlusClient() throws IOException, GeneralSecurityException { - credential = new GoogleCredential.Builder() - .setJsonFactory(JSON_FACTORY) - .setTransport(TRANSPORT) - .setServiceAccountScopes(SCOPE) - .setServiceAccountId(this.config.getOauth().getServiceAccountEmailAddress()) - .setServiceAccountPrivateKeyFromP12File(new File(this.config.getOauth().getPathToP12KeyFile())) - .build(); - return new Plus.Builder(TRANSPORT,JSON_FACTORY, credential).setApplicationName(this.config.getOauth().getAppName()).build(); - } + public AbstractGPlusProvider(GPlusConfiguration config) { + this.config = config; + } - @Override - public void cleanUp() { - ComponentUtils.shutdownExecutor(this.executor, 10, 10); - this.executor = null; - } + @Override + public void prepare(Object configurationObject) { - public GPlusConfiguration getConfig() { - return config; - } + Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile()); + Preconditions.checkNotNull(config.getOauth().getAppName()); + Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress()); - public void setConfig(GPlusConfiguration config) { - this.config = config; + try { + this.plus = createPlusClient(); + } catch (IOException | GeneralSecurityException ex) { + LOGGER.error("Failed to created oauth for GPlus : {}", ex); + throw new RuntimeException(ex); } - - /** - * Set and overwrite the default before date that was read from the configuration file. - * @param defaultBeforeDate - */ - public void setDefaultBeforeDate(DateTime defaultBeforeDate) { - this.config.setDefaultBeforeDate(defaultBeforeDate); + // GPlus rate limits you to 5 calls per second, so there is not a need to execute more than one + // collector unless you have multiple oauth tokens + //TODO make this configurable based on the number of oauth tokens + this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + this.datumQueue = new LinkedBlockingQueue<>(1000); + this.isComplete = new AtomicBoolean(false); + this.previousPullWasEmpty = false; + } + + @Override + public void startStream() { + + BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2); + for (UserInfo user : this.config.getGooglePlusUsers()) { + if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == null) { + user.setAfterDate(this.config.getDefaultAfterDate()); + } + if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() == null) { + user.setBeforeDate(this.config.getDefaultBeforeDate()); + } + this.executor.submit(getDataCollector(backOffStrategy, this.datumQueue, this.plus, user)); } - - /** - * Set and overwrite the default after date that was read from teh configuration file. - * @param defaultAfterDate - */ - public void setDefaultAfterDate(DateTime defaultAfterDate) { - this.config.setDefaultAfterDate(defaultAfterDate); + this.executor.shutdown(); + } + + protected abstract Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo); + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public StreamsResultSet readCurrent() { + BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>(); + int batchCount = 0; + while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) { + StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue); + if (datum != null) { + ++batchCount; + ComponentUtils.offerUntilSuccess(datum, batch); + } } - - /** - * Sets and overwrite the user info from the configuaration file. Uses the defaults before and after dates. - * @param userIds - */ - public void setUserInfoWithDefaultDates(Set<String> userIds) { - List<UserInfo> gPlusUsers = new LinkedList<>(); - for(String userId : userIds) { - UserInfo user = new UserInfo(); - user.setUserId(userId); - user.setAfterDate(this.config.getDefaultAfterDate()); - user.setBeforeDate(this.config.getDefaultBeforeDate()); - gPlusUsers.add(user); - } - this.config.setGooglePlusUsers(gPlusUsers); + boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() && this.executor.isTerminated(); + this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty); + this.previousPullWasEmpty = pullIsEmpty; + return new StreamsResultSet(batch); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @VisibleForTesting + protected Plus createPlusClient() throws IOException, GeneralSecurityException { + credential = new GoogleCredential.Builder() + .setJsonFactory(JSON_FACTORY) + .setTransport(TRANSPORT) + .setServiceAccountScopes(SCOPE) + .setServiceAccountId(this.config.getOauth().getServiceAccountEmailAddress()) + .setServiceAccountPrivateKeyFromP12File(new File(this.config.getOauth().getPathToP12KeyFile())) + .build(); + return new Plus.Builder(TRANSPORT,JSON_FACTORY, credential).setApplicationName(this.config.getOauth().getAppName()).build(); + } + + @Override + public void cleanUp() { + ComponentUtils.shutdownExecutor(this.executor, 10, 10); + this.executor = null; + } + + public GPlusConfiguration getConfig() { + return config; + } + + public void setConfig(GPlusConfiguration config) { + this.config = config; + } + + /** + * Set and overwrite the default before date that was read from the configuration file. + * @param defaultBeforeDate defaultBeforeDate + */ + public void setDefaultBeforeDate(DateTime defaultBeforeDate) { + this.config.setDefaultBeforeDate(defaultBeforeDate); + } + + /** + * Set and overwrite the default after date that was read from teh configuration file. + * @param defaultAfterDate defaultAfterDate + */ + public void setDefaultAfterDate(DateTime defaultAfterDate) { + this.config.setDefaultAfterDate(defaultAfterDate); + } + + /** + * Sets and overwrite the user info from the configuaration file. Uses the defaults before and after dates. + * @param userIds userIds + */ + public void setUserInfoWithDefaultDates(Set<String> userIds) { + List<UserInfo> gplusUsers = new LinkedList<>(); + for (String userId : userIds) { + UserInfo user = new UserInfo(); + user.setUserId(userId); + user.setAfterDate(this.config.getDefaultAfterDate()); + user.setBeforeDate(this.config.getDefaultBeforeDate()); + gplusUsers.add(user); } - - /** - * Set and overwrite user into from teh configuration file. Only sets after dater. - * @param usersAndAfterDates - */ - public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) { - List<UserInfo> gPlusUsers = new LinkedList<>(); - for(String userId : usersAndAfterDates.keySet()) { - UserInfo user = new UserInfo(); - user.setUserId(userId); - user.setAfterDate(usersAndAfterDates.get(userId)); - gPlusUsers.add(user); - } - this.config.setGooglePlusUsers(gPlusUsers); + this.config.setGooglePlusUsers(gplusUsers); + } + + /** + * Set and overwrite user into from the configuration file. Only sets after date. + * @param usersAndAfterDates usersAndAfterDates + */ + public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) { + List<UserInfo> gplusUsers = new LinkedList<>(); + for (String userId : usersAndAfterDates.keySet()) { + UserInfo user = new UserInfo(); + user.setUserId(userId); + user.setAfterDate(usersAndAfterDates.get(userId)); + gplusUsers.add(user); } - - @Override - public boolean isRunning() { - if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); - isComplete.set(true); - LOGGER.info("Exiting"); - } - return !isComplete.get(); + this.config.setGooglePlusUsers(gplusUsers); + } + + @Override + public boolean isRunning() { + if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + isComplete.set(true); + LOGGER.info("Exiting"); } + return !isComplete.get(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java index 4991e94..20f5002 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java @@ -18,50 +18,54 @@ package com.google.gplus.provider; -import com.google.gplus.serializer.util.GooglePlusActivityUtil; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivitySerializer; import org.apache.streams.pojo.json.Activity; + +import com.google.gplus.serializer.util.GooglePlusActivityUtil; + +import org.apache.commons.lang.NotImplementedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; - +/** + * GPlusActivitySerializer converts gplus activities to as1 activities. + */ public class GPlusActivitySerializer implements ActivitySerializer<com.google.api.services.plus.model.Activity> { - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(GPlusActivitySerializer.class); - AbstractGPlusProvider provider; + AbstractGPlusProvider provider; - public GPlusActivitySerializer(AbstractGPlusProvider provider) { + public GPlusActivitySerializer(AbstractGPlusProvider provider) { - this.provider = provider; - } + this.provider = provider; + } - public GPlusActivitySerializer() { - } + public GPlusActivitySerializer() { + } - @Override - public String serializationFormat() { - return "gplus.v1"; - } + @Override + public String serializationFormat() { + return "gplus.v1"; + } - @Override - public com.google.api.services.plus.model.Activity serialize(Activity deserialized) { - throw new NotImplementedException("Not currently implemented"); - } + @Override + public com.google.api.services.plus.model.Activity serialize(Activity deserialized) { + throw new NotImplementedException("Not currently implemented"); + } - @Override - public Activity deserialize(com.google.api.services.plus.model.Activity gplusActivity) { - Activity activity = new Activity(); + @Override + public Activity deserialize(com.google.api.services.plus.model.Activity gplusActivity) { + Activity activity = new Activity(); - GooglePlusActivityUtil.updateActivity(gplusActivity, activity); - return activity; - } + GooglePlusActivityUtil.updateActivity(gplusActivity, activity); + return activity; + } - @Override - public List<Activity> deserializeAll(List<com.google.api.services.plus.model.Activity> serializedList) { - throw new NotImplementedException("Not currently implemented"); - } + @Override + public List<Activity> deserializeAll(List<com.google.api.services.plus.model.Activity> serializedList) { + throw new NotImplementedException("Not currently implemented"); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java index 5be2f9c..edbc663 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java @@ -18,51 +18,52 @@ package com.google.gplus.provider; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; import org.apache.streams.util.api.requests.backoff.BackOffException; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * GPlusDataCollector collects GPlus Data on behalf of providers. */ public abstract class GPlusDataCollector implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusDataCollector.class); - + private static final Logger LOGGER = LoggerFactory.getLogger(GPlusDataCollector.class); - /** - * Looks at the status code of the expception. If the code indicates that the request should be retried, - * it executes the back off strategy and returns true. - * @param gjre - * @param backOff - * @return returns true if the error code of the exception indicates the request should be retried. - */ - public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, BackOffStrategy backOff) throws BackOffException { - boolean tryAgain = false; - switch (gjre.getStatusCode()) { - case 400 : - LOGGER.warn("Bad Request : {}", gjre); - break; - case 401 : - LOGGER.warn("Invalid Credentials : {}", gjre); - case 403 : - LOGGER.warn("Possible rate limit exception. Retrying. : {}", gjre.getMessage()); - backOff.backOff(); - tryAgain = true; - break; - case 503 : - LOGGER.warn("Google Backend Service Error : {}", gjre); - break; - default: - LOGGER.warn("Google Service returned error : {}", gjre); - tryAgain = true; - backOff.backOff(); - break; - } - return tryAgain; + /** + * Looks at the status code of the exception. If the code indicates that the request should be retried, + * it executes the back off strategy and returns true. + * @param gjre GoogleJsonResponseException + * @param backOff BackOffStrategy + * @return returns true if the error code of the exception indicates the request should be retried. + */ + public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, BackOffStrategy backOff) throws BackOffException { + boolean tryAgain = false; + switch (gjre.getStatusCode()) { + case 400 : + LOGGER.warn("Bad Request : {}", gjre); + break; + case 401 : + LOGGER.warn("Invalid Credentials : {}", gjre); + break; + case 403 : + LOGGER.warn("Possible rate limit exception. Retrying. : {}", gjre.getMessage()); + backOff.backOff(); + tryAgain = true; + break; + case 503 : + LOGGER.warn("Google Backend Service Error : {}", gjre); + break; + default: + LOGGER.warn("Google Service returned error : {}", gjre); + tryAgain = true; + backOff.backOff(); + break; } - - + return tryAgain; + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java deleted file mode 100644 index 1f1ee2f..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.provider; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.pojo.json.Activity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Objects; -import java.util.Queue; -import java.util.Random; -import java.util.concurrent.BlockingQueue; - -public class GPlusEventProcessor implements Runnable { - - private final static Logger LOGGER = LoggerFactory.getLogger(GPlusEventProcessor.class); - - private ObjectMapper mapper = new ObjectMapper(); - - private BlockingQueue<String> inQueue; - private Queue<StreamsDatum> outQueue; - - private Class outClass; - - private GPlusActivitySerializer gPlusActivitySerializer = new GPlusActivitySerializer(); - - private final static String TERMINATE = "TERMINATE"; - - public GPlusEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) { - this.inQueue = inQueue; - this.outQueue = outQueue; - this.outClass = outClass; - } - - public GPlusEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) { - this.inQueue = inQueue; - this.outQueue = outQueue; - this.outClass = outClass; - } - - @Override - public void run() { - - while(true) { - try { - String item = inQueue.take(); - Thread.sleep(new Random().nextInt(100)); - if(Objects.equals(item, TERMINATE)) { - LOGGER.info("Terminating!"); - break; - } - - // first check for valid json - ObjectNode node = (ObjectNode)mapper.readTree(item); - - // if the target is string, just pass-through - if( String.class.equals(outClass)) - outQueue.offer(new StreamsDatum(item)); - else { - // convert to desired format - com.google.api.services.plus.model.Activity gplusActivity = mapper.readValue(item, com.google.api.services.plus.model.Activity.class); - - Activity streamsActivity = gPlusActivitySerializer.deserialize(gplusActivity); - - outQueue.offer(new StreamsDatum(streamsActivity)); - } - - } catch (Exception e) { - e.printStackTrace(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java index f475e5d..5585bfc 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java @@ -18,6 +18,11 @@ package com.google.gplus.provider; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + import com.fasterxml.jackson.core.JsonGenerationException; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -30,10 +35,7 @@ import com.google.api.services.plus.Plus; import com.google.api.services.plus.model.Activity; import com.google.api.services.plus.model.ActivityFeed; import com.google.gplus.serializer.util.GPlusActivityDeserializer; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,90 +48,107 @@ import java.util.concurrent.BlockingQueue; */ public class GPlusUserActivityCollector extends GPlusDataCollector { - /** - * Key for all public activities - * https://developers.google.com/+/api/latest/activities/list - */ - private static final String PUBLIC_COLLECTION = "public"; - /** - * Max results allowed per request - * https://developers.google.com/+/api/latest/activities/list - */ - private static final long MAX_RESULTS = 100; - private static final int MAX_ATTEMPTS = 5; - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserActivityCollector.class); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + /** + * Key for all public activities + * https://developers.google.com/+/api/latest/activities/list + */ + private static final String PUBLIC_COLLECTION = "public"; + /** + * Max results allowed per request + * https://developers.google.com/+/api/latest/activities/list + */ + private static final long MAX_RESULTS = 100; + private static final int MAX_ATTEMPTS = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserActivityCollector.class); + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - static { //set up mapper for Google Activity Object - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer()); - simpleModule.addSerializer(com.google.api.client.util.DateTime.class, new StdSerializer<com.google.api.client.util.DateTime>(com.google.api.client.util.DateTime.class) { - @Override - public void serialize(com.google.api.client.util.DateTime dateTime, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonGenerationException { - jsonGenerator.writeString(dateTime.toStringRfc3339()); - } + static { //set up mapper for Google Activity Object + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Activity.class, new GPlusActivityDeserializer()); + simpleModule.addSerializer( + com.google.api.client.util.DateTime.class, + new StdSerializer<com.google.api.client.util.DateTime>(com.google.api.client.util.DateTime.class) { + @Override + public void serialize( + com.google.api.client.util.DateTime dateTime, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) + throws IOException { + jsonGenerator.writeString(dateTime.toStringRfc3339()); + } }); - MAPPER.registerModule(simpleModule); - MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } + MAPPER.registerModule(simpleModule); + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } - private BlockingQueue<StreamsDatum> datumQueue; - private BackOffStrategy backOff; - private Plus gPlus; - private UserInfo userInfo; + private BlockingQueue<StreamsDatum> datumQueue; + private BackOffStrategy backOff; + private Plus plus; + private UserInfo userInfo; - public GPlusUserActivityCollector(Plus gPlus, BlockingQueue<StreamsDatum> datumQueue, BackOffStrategy backOff, UserInfo userInfo) { - this.gPlus = gPlus; - this.datumQueue = datumQueue; - this.backOff = backOff; - this.userInfo = userInfo; - } + /** + * GPlusUserActivityCollector constructor. + * @param plus Plus + * @param datumQueue BlockingQueue<StreamsDatum> + * @param backOff BackOffStrategy + * @param userInfo UserInfo + */ + public GPlusUserActivityCollector(Plus plus, BlockingQueue<StreamsDatum> datumQueue, BackOffStrategy backOff, UserInfo userInfo) { + this.plus = plus; + this.datumQueue = datumQueue; + this.backOff = backOff; + this.userInfo = userInfo; + } - @Override - public void run() { - collectActivityData(); - } + @Override + public void run() { + collectActivityData(); + } - protected void collectActivityData() { + protected void collectActivityData() { + try { + ActivityFeed feed = null; + boolean tryAgain = false; + int attempt = 0; + DateTime afterDate = userInfo.getAfterDate(); + DateTime beforeDate = userInfo.getBeforeDate(); + do { try { - ActivityFeed feed = null; - boolean tryAgain = false; - int attempt = 0; - DateTime afterDate = userInfo.getAfterDate(); - DateTime beforeDate = userInfo.getBeforeDate(); - do { - try { - if(feed == null) { - feed = this.gPlus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION).setMaxResults(MAX_RESULTS).execute(); - } else { - feed = this.gPlus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION).setMaxResults(MAX_RESULTS).setPageToken(feed.getNextPageToken()).execute(); - } - this.backOff.reset(); //successful pull reset api. - for(com.google.api.services.plus.model.Activity activity : feed.getItems()) { - DateTime published = new DateTime(activity.getPublished().getValue()); - if( (afterDate == null && beforeDate == null) - || (beforeDate == null && afterDate.isBefore(published)) - || (afterDate == null && beforeDate.isAfter(published)) - || ((afterDate != null && beforeDate != null) && (afterDate.isBefore(published) && beforeDate.isAfter(published)))) { - String json = MAPPER.writeValueAsString(activity); - this.datumQueue.put(new StreamsDatum(json, activity.getId())); - } else if(afterDate != null && afterDate.isAfter(published)) { - feed.setNextPageToken(null); // do not fetch next page - break; - } - } - } catch (GoogleJsonResponseException gjre) { - tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff); - ++attempt; - } - } while((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS); - } catch (Throwable t) { - if(t instanceof InterruptedException) { - Thread.currentThread().interrupt(); + if (feed == null) { + feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION) + .setMaxResults(MAX_RESULTS).execute(); + } else { + feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION) + .setMaxResults(MAX_RESULTS) + .setPageToken(feed.getNextPageToken()).execute(); + } + this.backOff.reset(); //successful pull reset api. + for (com.google.api.services.plus.model.Activity activity : feed.getItems()) { + DateTime published = new DateTime(activity.getPublished().getValue()); + if ((afterDate == null && beforeDate == null) + || (beforeDate == null && afterDate.isBefore(published)) + || (afterDate == null && beforeDate.isAfter(published)) + || ((afterDate != null && beforeDate != null) && (afterDate.isBefore(published) && beforeDate.isAfter(published)))) { + String json = MAPPER.writeValueAsString(activity); + this.datumQueue.put(new StreamsDatum(json, activity.getId())); + } else if (afterDate != null && afterDate.isAfter(published)) { + feed.setNextPageToken(null); // do not fetch next page + break; } - t.printStackTrace(); - LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), t); + } + } catch (GoogleJsonResponseException gjre) { + tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff); + ++attempt; } + } + while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS); + } catch (Throwable th) { + if (th instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + th.printStackTrace(); + LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), th); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java index e6b2223..97b08fd 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java @@ -18,13 +18,6 @@ package com.google.gplus.provider; -import com.google.api.services.plus.Plus; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; -import com.google.gson.Gson; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; @@ -33,6 +26,14 @@ import org.apache.streams.google.gplus.GPlusConfiguration; import org.apache.streams.google.gplus.configuration.UserInfo; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import com.google.api.services.plus.Plus; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import com.google.gson.Gson; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -43,76 +44,88 @@ import java.util.concurrent.TimeUnit; /** * Retrieve recent activity from a list of accounts. * + * <p/> * To use from command line: * + * <p/> * Supply (at least) the following required configuration in application.conf: * + * <p/> * gplus.oauth.pathToP12KeyFile * gplus.oauth.serviceAccountEmailAddress * gplus.apiKey * gplus.googlePlusUsers * + * <p/> * Launch using: * + * <p/> * mvn exec:java -Dexec.mainClass=com.google.gplus.provider.GPlusUserActivityProvider -Dexec.args="application.conf activity.json" */ -public class GPlusUserActivityProvider extends AbstractGPlusProvider{ - - private final static String STREAMS_ID = "GPlusUserActivityProvider"; - - public GPlusUserActivityProvider() { - super(); - } - - public GPlusUserActivityProvider(GPlusConfiguration config) { - super(config); - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) { - return new GPlusUserActivityCollector(plus, queue, strategy, userInfo); - } - - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus"); - GPlusUserActivityProvider provider = new GPlusUserActivityProvider(config); - - Gson gson = new Gson(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - for (StreamsDatum datum : provider.readCurrent()) { - String json; - if (datum.getDocument() instanceof String) - json = (String) datum.getDocument(); - else - json = gson.toJson(datum.getDocument()); - outStream.println(json); - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); +public class GPlusUserActivityProvider extends AbstractGPlusProvider { + + private static final String STREAMS_ID = "GPlusUserActivityProvider"; + + public GPlusUserActivityProvider() { + super(); + } + + public GPlusUserActivityProvider(GPlusConfiguration config) { + super(config); + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) { + return new GPlusUserActivityCollector(plus, queue, strategy, userInfo); + } + + /** + * Retrieve recent activity from a list of accounts. + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus"); + GPlusUserActivityProvider provider = new GPlusUserActivityProvider(config); + + Gson gson = new Gson(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + for (StreamsDatum datum : provider.readCurrent()) { + String json; + if (datum.getDocument() instanceof String) { + json = (String) datum.getDocument(); + } else { + json = gson.toJson(datum.getDocument()); + } + outStream.println(json); + } } + while ( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java index 78a1649..3da3468 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java @@ -18,6 +18,11 @@ package com.google.gplus.provider; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.google.gplus.configuration.UserInfo; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; @@ -25,73 +30,77 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.services.plus.Plus; import com.google.api.services.plus.model.Person; import com.google.gplus.serializer.util.GPlusPersonDeserializer; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.api.requests.backoff.BackOffStrategy; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; /** - * Collects user profile information for a specific GPlus user + * Collects user profile information for a specific GPlus user. */ public class GPlusUserDataCollector extends GPlusDataCollector { - private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserDataCollector.class); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - private static final int MAX_ATTEMPTS = 5; + private static final Logger LOGGER = LoggerFactory.getLogger(GPlusUserDataCollector.class); + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final int MAX_ATTEMPTS = 5; - static { //set up Mapper for Person objects - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer()); - MAPPER.registerModule(simpleModule); - MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } + static { //set up Mapper for Person objects + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer()); + MAPPER.registerModule(simpleModule); + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } - private BackOffStrategy backOffStrategy; - private Plus gPlus; - private BlockingQueue<StreamsDatum> datumQueue; - private UserInfo userInfo; + private BackOffStrategy backOffStrategy; + private Plus plus; + private BlockingQueue<StreamsDatum> datumQueue; + private UserInfo userInfo; + /** + * GPlusUserDataCollector constructor. + * @param plus Plus + * @param backOffStrategy BackOffStrategy + * @param datumQueue BlockingQueue of StreamsDatum + * @param userInfo UserInfo + */ + public GPlusUserDataCollector(Plus plus, BackOffStrategy backOffStrategy, BlockingQueue<StreamsDatum> datumQueue, UserInfo userInfo) { + this.plus = plus; + this.backOffStrategy = backOffStrategy; + this.datumQueue = datumQueue; + this.userInfo = userInfo; + } - public GPlusUserDataCollector(Plus gPlus, BackOffStrategy backOffStrategy, BlockingQueue<StreamsDatum> datumQueue, UserInfo userInfo) { - this.gPlus = gPlus; - this.backOffStrategy = backOffStrategy; - this.datumQueue = datumQueue; - this.userInfo = userInfo; - } - - protected void queueUserHistory() { + protected void queueUserHistory() { + try { + boolean tryAgain = false; + int attempts = 0; + com.google.api.services.plus.model.Person person = null; + do { try { - boolean tryAgain = false; - int attempts = 0; - com.google.api.services.plus.model.Person person = null; - do { - try { - person = this.gPlus.people().get(userInfo.getUserId()).execute(); - this.backOffStrategy.reset(); - tryAgain = person == null; - } catch (GoogleJsonResponseException gjre) { - tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOffStrategy); - } - ++attempts; - } while(tryAgain && attempts < MAX_ATTEMPTS); - String json = MAPPER.writeValueAsString(person); - this.datumQueue.put(new StreamsDatum(json, person.getId())); - } catch (Throwable t) { - LOGGER.warn("Unable to pull user data for user={} : {}", userInfo.getUserId(), t); - if(t instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } + person = this.plus.people().get(userInfo.getUserId()).execute(); + this.backOffStrategy.reset(); + tryAgain = person == null; + } catch (GoogleJsonResponseException gjre) { + tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOffStrategy); } + ++attempts; + } + while (tryAgain && attempts < MAX_ATTEMPTS); + String json = MAPPER.writeValueAsString(person); + this.datumQueue.put(new StreamsDatum(json, person.getId())); + } catch (Throwable throwable) { + LOGGER.warn("Unable to pull user data for user={} : {}", userInfo.getUserId(), throwable); + if (throwable instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } } + } - @Override - public void run() { - queueUserHistory(); - } + @Override + public void run() { + queueUserHistory(); + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java index 1541818..28bcb55 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java @@ -18,25 +18,22 @@ package com.google.gplus.provider; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.services.plus.Plus; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Uninterruptibles; -import com.google.gson.Gson; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.google.gplus.GPlusConfiguration; import org.apache.streams.google.gplus.configuration.UserInfo; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; +import com.google.api.services.plus.Plus; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import com.google.gson.Gson; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -48,78 +45,90 @@ import java.util.concurrent.TimeUnit; /** * Retrieve current profile status for a list of accounts. * + * <p/> * To use from command line: * + * <p/> * Supply (at least) the following required configuration in application.conf: * + * <p/> * gplus.oauth.pathToP12KeyFile * gplus.oauth.serviceAccountEmailAddress * gplus.apiKey * gplus.googlePlusUsers * + * <p/> * Launch using: * + * <p/> * mvn exec:java -Dexec.mainClass=com.google.gplus.provider.GPlusUserDataProvider -Dexec.args="application.conf profiles.json" */ -public class GPlusUserDataProvider extends AbstractGPlusProvider{ - - public final static String STREAMS_ID = "GPlusUserDataProvider"; - - public GPlusUserDataProvider() { - super(); - } - - public GPlusUserDataProvider(GPlusConfiguration config) { - super(config); - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) { - return new GPlusUserDataCollector(plus, strategy, queue, userInfo); - } - - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus"); - GPlusUserDataProvider provider = new GPlusUserDataProvider(config); - - Gson gson = new Gson(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - if (datum.getDocument() instanceof String) - json = (String) datum.getDocument(); - else - json = gson.toJson(datum.getDocument()); - outStream.println(json); - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); +public class GPlusUserDataProvider extends AbstractGPlusProvider { + + public static final String STREAMS_ID = "GPlusUserDataProvider"; + + public GPlusUserDataProvider() { + super(); + } + + public GPlusUserDataProvider(GPlusConfiguration config) { + super(config); + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) { + return new GPlusUserDataCollector(plus, strategy, queue, userInfo); + } + + /** + * Retrieve current profile status for a list of accounts. + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + GPlusConfiguration config = new ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, "gplus"); + GPlusUserDataProvider provider = new GPlusUserDataProvider(config); + + Gson gson = new Gson(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + if (datum.getDocument() instanceof String) { + json = (String) datum.getDocument(); + } else { + json = gson.toJson(datum.getDocument()); + } + outStream.println(json); + } } + while ( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } }
