http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java deleted file mode 100644 index 120efe9..0000000 --- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.mongo; - -import com.google.common.base.Objects; -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by sblackmon on 12/10/13. - */ -public class MongoConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(MongoConfigurator.class); - - public static MongoConfiguration detectConfiguration(Config mongo) { - - MongoConfiguration mongoConfiguration = new MongoConfiguration(); - - mongoConfiguration.setHost(mongo.getString("host")); - mongoConfiguration.setPort(new Long(mongo.getInt("port"))); - mongoConfiguration.setDb(mongo.getString("db")); - mongoConfiguration.setCollection(mongo.getString("collection")); - - if( mongo.hasPath("user")) - mongoConfiguration.setUser(mongo.getString("user")); - if( mongo.hasPath("password")) - mongoConfiguration.setPassword(mongo.getString("password")); - return mongoConfiguration; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java index 9772f95..ba77ff1 100644 --- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java +++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java @@ -23,11 +23,19 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Queues; -import com.mongodb.*; -import com.mongodb.util.JSON; -import com.typesafe.config.Config; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -35,11 +43,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.math.BigInteger; -import java.net.UnknownHostException; -import java.util.List; import java.util.Queue; -import java.util.Random; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -49,7 +57,6 @@ public class MongoPersistReader implements StreamsPersistReader { public static final String STREAMS_ID = "MongoPersistReader"; private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistReader.class); - private final static long MAX_WRITE_LATENCY = 1000; protected volatile Queue<StreamsDatum> persistQueue; @@ -59,7 +66,6 @@ public class MongoPersistReader implements StreamsPersistReader { private ExecutorService executor; private MongoConfiguration config; - private MongoPersistReaderTask readerTask; protected MongoClient client; protected DB db; @@ -67,13 +73,11 @@ public class MongoPersistReader implements StreamsPersistReader { protected DBCursor cursor; - protected List<DBObject> insertBatch = Lists.newArrayList(); - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); public MongoPersistReader() { - Config config = StreamsConfigurator.config.getConfig("mongo"); - this.config = MongoConfigurator.detectConfiguration(config); + this.config = new ComponentConfigurator<>(MongoConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")); } public MongoPersistReader(MongoConfiguration config) { @@ -81,8 +85,8 @@ public class MongoPersistReader implements StreamsPersistReader { } public MongoPersistReader(Queue<StreamsDatum> persistQueue) { - Config config = StreamsConfigurator.config.getConfig("mongo"); - this.config = MongoConfigurator.detectConfiguration(config); + this.config = new ComponentConfigurator<>(MongoConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")); this.persistQueue = persistQueue; } @@ -95,14 +99,6 @@ public class MongoPersistReader implements StreamsPersistReader { } public void stop() { - -// try { -// client.st -// client.requestDone(); -// } catch (Exception e) { -// } finally { -// client.requestDone(); -// } } @Override @@ -121,8 +117,7 @@ public class MongoPersistReader implements StreamsPersistReader { cursor = collection.find(); - if( cursor == null || - cursor.hasNext() == false ) + if( cursor == null || !cursor.hasNext()) throw new RuntimeException("Collection not present or empty!"); persistQueue = constructQueue(); @@ -149,9 +144,8 @@ public class MongoPersistReader implements StreamsPersistReader { LOGGER.warn("document isn't valid JSON."); return null; } - StreamsDatum datum = new StreamsDatum(objectNode, id); - return datum; + return new StreamsDatum(objectNode, id); } private synchronized void connectToMongo() { @@ -161,7 +155,7 @@ public class MongoPersistReader implements StreamsPersistReader { if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) { MongoCredential credential = MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray()); - client = new MongoClient(serverAddress, Lists.<MongoCredential>newArrayList(credential)); + client = new MongoClient(serverAddress, Lists.newArrayList(credential)); } else { client = new MongoClient(serverAddress); } @@ -178,15 +172,12 @@ public class MongoPersistReader implements StreamsPersistReader { @Override public StreamsResultSet readAll() { - DBCursor cursor = collection.find(); - try { - while(cursor.hasNext()) { + try (DBCursor cursor = collection.find()) { + while (cursor.hasNext()) { DBObject dbObject = cursor.next(); StreamsDatum datum = prepareDatum(dbObject); write(datum); } - } finally { - cursor.close(); } return readCurrent(); @@ -196,14 +187,14 @@ public class MongoPersistReader implements StreamsPersistReader { public void startStream() { LOGGER.debug("startStream"); - readerTask = new MongoPersistReaderTask(this); + MongoPersistReaderTask readerTask = new MongoPersistReaderTask(this); Thread readerTaskThread = new Thread(readerTask); Future future = executor.submit(readerTaskThread); while( !future.isDone() && !future.isCancelled()) { try { Thread.sleep(1000); - } catch (InterruptedException e) {} + } catch (InterruptedException ignored) {} } executor.shutdown(); @@ -219,9 +210,6 @@ public class MongoPersistReader implements StreamsPersistReader { lock.writeLock().lock(); current = new StreamsResultSet(persistQueue); current.setCounter(new DatumStatusCounter()); -// current.getCounter().add(countersCurrent); -// countersTotal.add(countersCurrent); -// countersCurrent = new DatumStatusCounter(); persistQueue = constructQueue(); } finally { lock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java index 4938dcc..5f6ac1f 100644 --- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java +++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java @@ -23,30 +23,29 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.mongodb.DB; -import com.mongodb.DBAddress; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.util.JSON; -import com.typesafe.config.Config; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.bson.types.ObjectId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Queue; import java.util.Random; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -71,12 +70,13 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable { protected DB db; protected DBCollection collection; - protected List<DBObject> insertBatch = Lists.newArrayList(); + protected List<DBObject> insertBatch = new ArrayList<>(); protected final ReadWriteLock lock = new ReentrantReadWriteLock(); public MongoPersistWriter() { - this(MongoConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("mongo"))); + this(new ComponentConfigurator<>(MongoConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"))); } public MongoPersistWriter(MongoConfiguration config) { @@ -112,7 +112,7 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable { lock.writeLock().lock(); collection.insert(insertBatch); lastWrite.set(System.currentTimeMillis()); - insertBatch = Lists.newArrayList(); + insertBatch = new ArrayList<>(); } finally { lock.writeLock().unlock(); } @@ -179,7 +179,7 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable { } try { Thread.sleep(new Random().nextInt(1)); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } } @@ -187,7 +187,7 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable { @Override public void prepare(Object configurationObject) { - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this.persistQueue = new ConcurrentLinkedQueue<>(); start(); } @@ -240,7 +240,7 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable { if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) { MongoCredential credential = MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray()); - client = new MongoClient(serverAddress, Lists.<MongoCredential>newArrayList(credential)); + client = new MongoClient(serverAddress, Lists.newArrayList(credential)); } else { client = new MongoClient(serverAddress); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java index bac96bf..7aa632e 100644 --- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java +++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java @@ -18,10 +18,9 @@ package org.apache.streams.peoplepattern; -import com.google.common.collect.Maps; -import org.apache.streams.components.http.HttpConfigurator; import org.apache.streams.components.http.HttpProcessorConfiguration; import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.extensions.ExtensionUtil; @@ -31,6 +30,7 @@ import org.apache.streams.pojo.json.Actor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; /** @@ -43,7 +43,8 @@ public class AccountTypeProcessor extends SimpleHTTPGetProcessor { private final static Logger LOGGER = LoggerFactory.getLogger(AccountTypeProcessor.class); public AccountTypeProcessor() { - this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern"))); + this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern"))); } public AccountTypeProcessor(HttpProcessorConfiguration peoplePatternConfiguration) { @@ -65,11 +66,11 @@ public class AccountTypeProcessor extends SimpleHTTPGetProcessor { Actor actor = activity.getActor(); ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class); String username = (String) ExtensionUtil.getInstance().getExtension(actorObject, "screenName"); - Map<String, String> params = Maps.newHashMap(); + Map<String, String> params = new HashMap<>(); params.put("id", actor.getId()); params.put("name", actor.getDisplayName()); params.put("username", username); params.put("description", actor.getSummary()); return params; } -}; +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java index 238685d..4f6af35 100644 --- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java +++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java @@ -18,10 +18,9 @@ package org.apache.streams.peoplepattern; -import com.google.common.collect.Maps; -import org.apache.streams.components.http.HttpConfigurator; import org.apache.streams.components.http.HttpProcessorConfiguration; import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.extensions.ExtensionUtil; @@ -31,6 +30,7 @@ import org.apache.streams.pojo.json.Actor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; /** @@ -43,7 +43,8 @@ public class DemographicsProcessor extends SimpleHTTPGetProcessor { private final static Logger LOGGER = LoggerFactory.getLogger(DemographicsProcessor.class); public DemographicsProcessor() { - this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("peoplepattern"))); + this(new ComponentConfigurator<>(HttpProcessorConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern"))); } public DemographicsProcessor(HttpProcessorConfiguration peoplePatternConfiguration) { @@ -65,7 +66,7 @@ public class DemographicsProcessor extends SimpleHTTPGetProcessor { Actor actor = activity.getActor(); ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class); String username = (String) ExtensionUtil.getInstance().getExtension(actorObject, "screenName"); - Map<String, String> params = Maps.newHashMap(); + Map<String, String> params = new HashMap<>(); params.put("id", actor.getId()); params.put("name", actor.getDisplayName()); params.put("username", username); @@ -73,4 +74,4 @@ public class DemographicsProcessor extends SimpleHTTPGetProcessor { return params; } -}; +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookStreamConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookStreamConfigurator.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookStreamConfigurator.java deleted file mode 100644 index da1b778..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookStreamConfigurator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.facebook.provider; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigException; -import org.apache.streams.facebook.FacebookConfiguration; -import org.apache.streams.facebook.FacebookOAuthConfiguration; -import org.apache.streams.facebook.FacebookUserInformationConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class FacebookStreamConfigurator { - private final static Logger LOGGER = LoggerFactory.getLogger(FacebookStreamConfigurator.class); - private final static ObjectMapper mapper = new ObjectMapper(); - - - public static FacebookUserInformationConfiguration detectFacebookConfiguration(Config config) { - FacebookUserInformationConfiguration facebookUserInformationConfiguration = new FacebookUserInformationConfiguration(); - - try { - Config oauth = config.getConfig("oauth"); - FacebookOAuthConfiguration facebookOAuthConfiguration = new FacebookOAuthConfiguration(); - facebookOAuthConfiguration.setAppAccessToken(oauth.getString("appAccessToken")); - facebookOAuthConfiguration.setAppSecret(oauth.getString("appSecret")); - facebookOAuthConfiguration.setUserAccessToken(oauth.getString("userAccessToken")); - facebookOAuthConfiguration.setAppId(oauth.getString("appId")); - - facebookUserInformationConfiguration.setOauth(facebookOAuthConfiguration); - } catch( ConfigException ce ) { - LOGGER.error("Exception while extracting Facebook oauth token: {}", ce.getMessage()); - } - - return facebookUserInformationConfiguration; - } - - public static FacebookConfiguration detectConfiguration(Config config) { - - FacebookConfiguration facebookConfiguration = mapper.convertValue(detectFacebookConfiguration(config), FacebookConfiguration.class); - - return facebookConfiguration; - } - - public static FacebookUserInformationConfiguration detectFacebookUserInformationConfiguration(Config config) { - return mapper.convertValue(detectFacebookConfiguration(config), FacebookUserInformationConfiguration.class); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java deleted file mode 100644 index d32bc9c..0000000 --- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/GMailConfigurator.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gmail; - -import com.google.gmail.GMailConfiguration; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigException; -import org.apache.streams.config.StreamsConfigurator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by sblackmon on 12/10/13. - */ -public class GMailConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(GMailConfigurator.class); - - public static GMailConfiguration detectConfiguration(Config gmail) { - - GMailConfiguration gmailConfiguration = new GMailConfiguration(); - - gmailConfiguration.setUserName(gmail.getString("username")); - gmailConfiguration.setPassword(gmail.getString("password")); - - return gmailConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java index 5872e27..5cc6fe7 100644 --- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java +++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java @@ -19,32 +19,40 @@ package com.google.gmail.provider; import com.google.common.base.Preconditions; -import com.google.common.collect.Queues; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.gmail.GMailConfiguration; -import com.google.gmail.GMailConfigurator; import com.googlecode.gmail4j.GmailClient; import com.googlecode.gmail4j.GmailConnection; import com.googlecode.gmail4j.http.HttpGmailConnection; import com.googlecode.gmail4j.javamail.ImapGmailClient; import com.googlecode.gmail4j.javamail.ImapGmailConnection; import com.googlecode.gmail4j.rss.RssGmailClient; -import com.typesafe.config.Config; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.math.BigInteger; import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Created by sblackmon on 12/10/13. */ -public class GMailProvider implements StreamsProvider, DatumStatusCountable { +public class GMailProvider implements StreamsProvider, Serializable { public final static String STREAMS_ID = "GMailProvider"; @@ -62,9 +70,9 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable { this.config = config; } - protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000); + protected BlockingQueue inQueue = new LinkedBlockingQueue<>(10000); - protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); protected Future task; public BlockingQueue<Object> getInQueue() { @@ -83,8 +91,8 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable { } public GMailProvider() { - Config config = StreamsConfigurator.config.getConfig("gmail"); - this.config = GMailConfigurator.detectConfiguration(config); + this.config = new ComponentConfigurator<>(GMailConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail")); } public GMailProvider(GMailConfiguration config) { @@ -92,8 +100,8 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable { } public GMailProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("gmail"); - this.config = GMailConfigurator.detectConfiguration(config); + this.config = new ComponentConfigurator<>(GMailConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail")); this.klass = klass; } @@ -123,7 +131,7 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable { StreamsResultSet current; synchronized( GMailProvider.class ) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); + current = new StreamsResultSet(new ConcurrentLinkedQueue<>(providerQueue)); current.setCounter(new DatumStatusCounter()); current.getCounter().add(countersCurrent); countersTotal.add(countersCurrent); @@ -179,9 +187,4 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable { e.printStackTrace(); } } - - @Override - public DatumStatusCounter getDatumStatusCounter() { - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java index b9e9b2d..734e711 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java @@ -18,23 +18,20 @@ package com.google.gplus.provider; -import com.google.api.client.googleapis.auth.oauth2.GoogleAuthorizationCodeFlow; import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets; import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; -import com.google.api.client.googleapis.auth.oauth2.GoogleTokenResponse; import com.google.api.client.http.HttpTransport; import com.google.api.client.http.javanet.NetHttpTransport; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.plus.Plus; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; -import com.typesafe.config.Config; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; @@ -53,14 +50,14 @@ import java.io.IOException; import java.math.BigInteger; import java.security.GeneralSecurityException; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -94,8 +91,8 @@ public abstract class AbstractGPlusProvider implements StreamsProvider { protected Plus plus; public AbstractGPlusProvider() { - Config config = StreamsConfigurator.config.getConfig("gplus"); - this.config = GPlusConfigurator.detectConfiguration(config); + this.config = new ComponentConfigurator<>(GPlusConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("gplus")); } public AbstractGPlusProvider(GPlusConfiguration config) { @@ -221,7 +218,7 @@ public abstract class AbstractGPlusProvider implements StreamsProvider { * @param userIds */ public void setUserInfoWithDefaultDates(Set<String> userIds) { - List<UserInfo> gPlusUsers = Lists.newLinkedList(); + List<UserInfo> gPlusUsers = new LinkedList<>(); for(String userId : userIds) { UserInfo user = new UserInfo(); user.setUserId(userId); @@ -237,7 +234,7 @@ public abstract class AbstractGPlusProvider implements StreamsProvider { * @param usersAndAfterDates */ public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) { - List<UserInfo> gPlusUsers = Lists.newLinkedList(); + List<UserInfo> gPlusUsers = new LinkedList<>(); for(String userId : usersAndAfterDates.keySet()) { UserInfo user = new UserInfo(); user.setUserId(userId); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java deleted file mode 100644 index 1999130..0000000 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusConfigurator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.google.gplus.provider; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.google.gplus.GPlusConfiguration; -import org.apache.streams.google.gplus.GPlusOAuthConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Created by sblackmon on 12/10/13. - */ -public class GPlusConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(GPlusConfigurator.class); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - public static GPlusConfiguration detectConfiguration(Config config) { - GPlusConfiguration configuration = null; - try { - configuration = MAPPER.readValue(config.root().render(ConfigRenderOptions.concise()), GPlusConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - } - Preconditions.checkNotNull(configuration); - - return configuration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java deleted file mode 100644 index 11e6d79..0000000 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.instagram; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * - */ -public class InstagramConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(InstagramConfigurator.class); - private final static ObjectMapper mapper = new ObjectMapper(); - - - public static InstagramConfiguration detectInstagramConfiguration(Config config) { - - InstagramConfiguration instagramConfiguration = null; - try { - instagramConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), InstagramConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - } - Preconditions.checkNotNull(instagramConfiguration); - - return instagramConfiguration; - } - - public static InstagramUserInformationConfiguration detectInstagramUserInformationConfiguration(Config config) { - - InstagramUserInformationConfiguration instagramConfiguration = null; - try { - instagramConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), InstagramUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - } - Preconditions.checkNotNull(instagramConfiguration); - - return instagramConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java index fe4b8da..fbba2a2 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java @@ -14,18 +14,16 @@ specific language governing permissions and limitations under the License. */ package org.apache.streams.instagram.provider; -import com.google.common.collect.Queues; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.instagram.InstagramConfiguration; -import org.apache.streams.instagram.InstagramConfigurator; import org.apache.streams.instagram.User; import org.apache.streams.instagram.UsersInfo; import org.apache.streams.util.ComponentUtils; @@ -37,11 +35,13 @@ import org.slf4j.LoggerFactory; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; @@ -60,16 +60,16 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { private static final int MAX_BATCH_SIZE = 2000; protected InstagramConfiguration config; - private InstagramDataCollector dataCollector; protected Queue<StreamsDatum> dataQueue; private ListeningExecutorService executorService; - List<ListenableFuture<Object>> futures = new ArrayList<>(); + private List<ListenableFuture<Object>> futures = new ArrayList<>(); private AtomicBoolean isCompleted; public InstagramAbstractProvider() { - this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram")); + this.config = new ComponentConfigurator<>(InstagramConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("instagram")); } public InstagramAbstractProvider(InstagramConfiguration config) { @@ -89,22 +89,22 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { @Override public void startStream() { - this.dataCollector = getInstagramDataCollector(); + InstagramDataCollector dataCollector = getInstagramDataCollector(); this.executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture future = this.executorService.submit(this.dataCollector); + ListenableFuture future = this.executorService.submit(dataCollector); this.futures.add(future); } /** * Return the data collector to use to connect to instagram. - * @return + * @return {@link InstagramDataCollector} */ protected abstract InstagramDataCollector getInstagramDataCollector(); @Override public StreamsResultSet readCurrent() { - Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue(); + Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>(); int count = 0; while(!this.dataQueue.isEmpty() && count < MAX_BATCH_SIZE) { ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue), batch); @@ -125,7 +125,7 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { @Override public void prepare(Object configurationObject) { - this.dataQueue = Queues.newConcurrentLinkedQueue(); + this.dataQueue = new ConcurrentLinkedQueue<>(); this.isCompleted = new AtomicBoolean(false); } @@ -171,7 +171,7 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { * @param tokenStrings */ public void setAuthorizedUserTokens(Collection<String> tokenStrings) { - ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings)); + ensureUsersInfo(this.config).setAuthorizedTokens(new HashSet<>(tokenStrings)); } /** @@ -197,7 +197,7 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { * @param usersWithAfterDate instagram user id mapped to BeforeDate time */ public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) { - Set<User> users = Sets.newHashSet(); + Set<User> users = new HashSet<>(); for(String userId : usersWithAfterDate.keySet()) { User user = new User(); user.setUserId(userId); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java index eda9b4a..9a31b5a 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java @@ -16,10 +16,7 @@ package org.apache.streams.instagram.provider.recentmedia; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Queues; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -28,18 +25,10 @@ import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.instagram.*; +import org.apache.streams.instagram.InstagramConfiguration; import org.apache.streams.instagram.provider.InstagramAbstractProvider; import org.apache.streams.instagram.provider.InstagramDataCollector; -import org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaCollector; -import org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.ComponentUtils; -import org.apache.streams.util.SerializationUtil; -import org.jinstagram.entity.users.feed.MediaFeedData; -import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,16 +36,8 @@ import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; -import java.math.BigInteger; -import java.util.Collection; import java.util.Iterator; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java deleted file mode 100644 index 4c3ba06..0000000 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.data.moreover; - -import com.google.common.collect.Lists; -import com.typesafe.config.Config; -import org.apache.streams.moreover.MoreoverConfiguration; -import org.apache.streams.moreover.MoreoverKeyData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * Created by sblackmon on 12/10/13. - */ -public class MoreoverConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverConfigurator.class); - - public static MoreoverConfiguration detectConfiguration(Config moreover) { - - MoreoverConfiguration moreoverConfiguration = new MoreoverConfiguration(); - - List<MoreoverKeyData> apiKeys = Lists.newArrayList(); - - Config apiKeysConfig = moreover.getConfig("apiKeys"); - - if( !apiKeysConfig.isEmpty()) - for( String apiKeyId : apiKeysConfig.root().keySet() ) { - Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId); - apiKeys.add(new MoreoverKeyData() - .withId(apiKeyConfig.getString("key")) - .withKey(apiKeyConfig.getString("key")) - .withStartingSequence(apiKeyConfig.getString("startingSequence"))); - } - moreoverConfiguration.setApiKeys(apiKeys); - - return moreoverConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamConfigurator.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamConfigurator.java deleted file mode 100644 index e1a7033..0000000 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamConfigurator.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.rss.provider; - -import com.google.common.collect.Lists; -import com.typesafe.config.Config; -import org.apache.streams.rss.FeedDetails; -import org.apache.streams.rss.RssStreamConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * Created by sblackmon on 12/10/13. - */ -public class RssStreamConfigurator { - - private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamConfigurator.class); - - public static RssStreamConfiguration detectConfiguration(Config rss) { - - RssStreamConfiguration rssStreamConfiguration = new RssStreamConfiguration(); - - List<FeedDetails> feeds = Lists.newArrayList(); - feeds.add(new FeedDetails().withUrl(rss.getString("url"))); - - rssStreamConfiguration.setFeeds(feeds); - return rssStreamConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java index a0e8ea1..d7dc918 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java @@ -22,16 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Queues; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; -import com.sun.syndication.feed.synd.SyndEntry; -import com.sun.syndication.feed.synd.SyndFeed; -import com.sun.syndication.io.FeedException; -import com.sun.syndication.io.SyndFeedInput; -import com.sun.syndication.io.XmlReader; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; @@ -53,14 +44,19 @@ import org.slf4j.LoggerFactory; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; -import java.io.IOException; import java.io.PrintStream; -import java.io.Serializable; import java.math.BigInteger; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -84,21 +80,21 @@ public class RssStreamProvider implements StreamsProvider { private RssStreamConfiguration config; private boolean perpetual; - private Set<String> urlFeeds; private ExecutorService executor; private BlockingQueue<StreamsDatum> dataQueue; private AtomicBoolean isComplete; - private int consecutiveEmptyReads; @VisibleForTesting protected RssFeedScheduler scheduler; public RssStreamProvider() { - this(RssStreamConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("rss")), false); + this(new ComponentConfigurator<>(RssStreamConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), false); } public RssStreamProvider(boolean perpetual) { - this(RssStreamConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("rss")), perpetual); + this(new ComponentConfigurator<>(RssStreamConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), perpetual); } public RssStreamProvider(RssStreamConfiguration config) { @@ -120,14 +116,13 @@ public class RssStreamProvider implements StreamsProvider { } public void setRssFeeds(Set<String> urlFeeds) { - this.urlFeeds = urlFeeds; } public void setRssFeeds(Map<String, Long> feeds) { if(this.config == null) { this.config = new RssStreamConfiguration(); } - List<FeedDetails> feedDetails = Lists.newLinkedList(); + List<FeedDetails> feedDetails = new ArrayList<>(); for(String feed : feeds.keySet()) { Long delay = feeds.get(feed); FeedDetails detail = new FeedDetails(); @@ -146,7 +141,7 @@ public class RssStreamProvider implements StreamsProvider { @Override public StreamsResultSet readCurrent() { - Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue(); + Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>(); int batchSize = 0; while(!this.dataQueue.isEmpty() && batchSize < MAX_SIZE) { StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.dataQueue); @@ -177,10 +172,10 @@ public class RssStreamProvider implements StreamsProvider { @Override public void prepare(Object configurationObject) { this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - this.dataQueue = Queues.newLinkedBlockingQueue(); + this.dataQueue = new LinkedBlockingQueue<>(); this.scheduler = getScheduler(this.dataQueue); this.isComplete = new AtomicBoolean(false); - this.consecutiveEmptyReads = 0; + int consecutiveEmptyReads = 0; } @VisibleForTesting @@ -222,9 +217,7 @@ public class RssStreamProvider implements StreamsProvider { provider.startStream(); do { Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); + for (StreamsDatum datum : provider.readCurrent()) { String json; try { json = mapper.writeValueAsString(datum.getDocument()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java index efbff46..8b483a1 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.streams.data.ActivitySerializer; import org.apache.streams.data.util.RFC3339Utils; import org.apache.streams.jackson.StreamsJacksonMapper; @@ -37,8 +36,8 @@ import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.LinkedList; import java.util.List; -import java.util.Map; public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> { @@ -57,7 +56,7 @@ public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNod @Override public List<Activity> deserializeAll(List<ObjectNode> objectNodes) { - List<Activity> result = Lists.newLinkedList(); + List<Activity> result = new LinkedList<>(); for (ObjectNode node : objectNodes) { result.add(deserialize(node)); } @@ -180,7 +179,7 @@ public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNod if (entry.get("uri") != null) uri = entry.get("uri").textValue(); - /** + /* * Order of precedence for resourceLocation selection * * 1. Valid URI @@ -209,9 +208,7 @@ public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNod * @return boolean of whether or not the resource is valid */ private boolean isValidResource(String resource) { - if(resource != null && resource.startsWith("http") || resource.startsWith("www")) - return true; - return false; + return resource != null && (resource.startsWith("http") || resource.startsWith("www")); } /** http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/config/SysomosConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/config/SysomosConfigurator.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/config/SysomosConfigurator.java deleted file mode 100644 index f503258..0000000 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/config/SysomosConfigurator.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.sysomos.config; - -import com.sysomos.SysomosConfiguration; -import com.typesafe.config.Config; - -/** - * Creates a {@link com.sysomos.SysomosConfiguration} instance from a {@link com.typesafe.config.Config} object. - */ -public class SysomosConfigurator { - public static SysomosConfiguration detectConfiguration(Config config) { - SysomosConfiguration sysomos = new SysomosConfiguration(); - sysomos.setHeartbeatIds(config.getStringList("heartbeatIds")); - sysomos.setApiBatchSize(config.getLong("apiBatchSize")); - sysomos.setApiKey(config.getString("apiKey")); - sysomos.setMinDelayMs(config.getLong("minDelayMs")); - sysomos.setScheduledDelayMs(config.getLong("scheduledDelayMs")); - sysomos.setMaxBatchSize(config.getLong("maxBatchSize")); - return sysomos; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java index 8330167..217d5dc 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java @@ -20,7 +20,7 @@ package org.apache.streams.twitter.processor; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; @@ -32,7 +32,6 @@ import org.apache.streams.twitter.converter.StreamsTwitterMapper; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.provider.TwitterConfigurator; import org.apache.streams.twitter.provider.TwitterEventClassifier; import org.apache.streams.twitter.provider.TwitterProviderUtil; import org.slf4j.Logger; @@ -44,6 +43,7 @@ import twitter4j.TwitterFactory; import twitter4j.TwitterObjectFactory; import twitter4j.conf.ConfigurationBuilder; +import java.util.ArrayList; import java.util.List; import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getProvider; @@ -69,7 +69,8 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor { private int retryCount; public FetchAndReplaceTwitterProcessor() { - this(TwitterConfigurator.detectTwitterStreamConfiguration(StreamsConfigurator.config.getConfig("twitter"))); + this(new ComponentConfigurator<>(TwitterStreamConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"))); } public FetchAndReplaceTwitterProcessor(TwitterStreamConfiguration config) { @@ -92,7 +93,7 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor { } else { throw new IllegalStateException("Requires an activity document"); } - return Lists.newArrayList(entry); + return new ArrayList<>(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index eac1218..3765229 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -107,7 +107,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); - PrintStream outStream = null; + PrintStream outStream; try { outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); } catch (FileNotFoundException e) { @@ -164,8 +164,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat } public TwitterStreamProvider() { - Config config = StreamsConfigurator.config.getConfig("twitter"); - this.config = TwitterConfigurator.detectTwitterStreamConfiguration(config); + this.config = new ComponentConfigurator<>(TwitterStreamConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter")); } public TwitterStreamProvider(TwitterStreamConfiguration config) { @@ -287,9 +287,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat return; } - LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[] {hosebirdHosts,endpoint,auth}); + LOGGER.debug("host={}\tendpoint={}\taut={}", hosebirdHosts, endpoint, auth); - providerQueue = new LinkedBlockingQueue<Future<List<StreamsDatum>>>(MAX_BATCH); + providerQueue = new LinkedBlockingQueue<>(MAX_BATCH); client = new ClientBuilder() .name("apache/streams/streams-contrib/streams-provider-twitter") http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeConfigurator.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeConfigurator.java deleted file mode 100644 index e6f8ccf..0000000 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeConfigurator.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.youtube.provider; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.youtube.pojo.YoutubeConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class YoutubeConfigurator { - private final static Logger LOGGER = LoggerFactory.getLogger(YoutubeConfigurator.class); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - public static YoutubeConfiguration detectConfiguration(Config config) { - YoutubeConfiguration configuration = null; - - try { - configuration = MAPPER.readValue(config.root().render(ConfigRenderOptions.concise()), YoutubeConfiguration.class); - } catch (IOException e) { - LOGGER.error("Exception while trying to use YoutubeConfigurator: {}", e); - } - - Preconditions.checkNotNull(configuration); - - return configuration; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java index e16f4bb..ab77467 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java @@ -34,7 +34,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; +import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; @@ -53,17 +53,15 @@ import java.io.IOException; import java.math.BigInteger; import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import static java.util.concurrent.Executors.newSingleThreadExecutor; - public abstract class YoutubeProvider implements StreamsProvider { public static final String STREAMS_ID = "YoutubeProvider"; @@ -73,7 +71,7 @@ public abstract class YoutubeProvider implements StreamsProvider { // This OAuth 2.0 access scope allows for full read/write access to the // authenticated user's account. - List<String> scopes = Lists.newArrayList("https://www.googleapis.com/auth/youtube"); + private List<String> scopes = Lists.newArrayList("https://www.googleapis.com/auth/youtube"); /** * Define a global instance of the HTTP transport. @@ -87,7 +85,7 @@ public abstract class YoutubeProvider implements StreamsProvider { private static final int DEFAULT_THREAD_POOL_SIZE = 5; - List<ListenableFuture<Object>> futures = new ArrayList<>(); + private List<ListenableFuture<Object>> futures = new ArrayList<>(); private ListeningExecutorService executor; private BlockingQueue<StreamsDatum> datumQueue; @@ -98,8 +96,8 @@ public abstract class YoutubeProvider implements StreamsProvider { protected YoutubeConfiguration config; public YoutubeProvider() { - Config config = StreamsConfigurator.config.getConfig("youtube"); - this.config = YoutubeConfigurator.detectConfiguration(config); + this.config = new ComponentConfigurator<>(YoutubeConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube")); Preconditions.checkNotNull(this.config.getApiKey()); } @@ -228,7 +226,7 @@ public abstract class YoutubeProvider implements StreamsProvider { * @param userIds */ public void setUserInfoWithDefaultDates(Set<String> userIds) { - List<UserInfo> youtubeUsers = Lists.newLinkedList(); + List<UserInfo> youtubeUsers = new LinkedList<>(); for(String userId : userIds) { UserInfo user = new UserInfo(); @@ -246,7 +244,7 @@ public abstract class YoutubeProvider implements StreamsProvider { * @param usersAndAfterDates */ public void setUserInfoWithAfterDate(Map<String, DateTime> usersAndAfterDates) { - List<UserInfo> youtubeUsers = Lists.newLinkedList(); + List<UserInfo> youtubeUsers = new LinkedList<>(); for(String userId : usersAndAfterDates.keySet()) { UserInfo user = new UserInfo(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java deleted file mode 100644 index ccfff3f..0000000 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMulitThreadTest.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.streams.local.queues; - -import com.carrotsearch.randomizedtesting.RandomizedTest; -import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.apache.streams.util.ComponentUtils; -import org.joda.time.DateTime; -import org.junit.After; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.management.InstanceNotFoundException; -import javax.management.ObjectName; -import java.lang.management.ManagementFactory; -import java.util.concurrent.*; - -/** - * MultiThread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue} - */ -public class ThroughputQueueMulitThreadTest extends RandomizedTest { - - private final static Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMulitThreadTest.class); - private static final String MBEAN_ID = "testQueue"; - private static final String STREAM_ID = "test_stream"; - private static long STREAM_START_TIME = (new DateTime()).getMillis(); - - /** - * Remove registered mbeans from previous tests - * @throws Exception - */ - @After - public void unregisterMXBean() throws Exception { - try { - ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); - } catch (InstanceNotFoundException ife) { - //No-op - } - } - - @After - public void removeLocalMBeans() { - try { - ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); - } catch (Exception e) { - //No op. proceed to next test - } - } - - - /** - * Test that queue will block on puts when the queue is full - * @throws InterruptedException - */ - @Test - public void testBlockOnFullQueue() throws InterruptedException { - int queueSize = randomIntBetween(1, 3000); - ExecutorService executor = Executors.newSingleThreadExecutor(); - CountDownLatch full = new CountDownLatch(1); - CountDownLatch finished = new CountDownLatch(1); - ThroughputQueue queue = new ThroughputQueue(queueSize); - BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize); - executor.submit(testThread); - full.await(); - assertEquals(queueSize, queue.size()); - assertEquals(queueSize, queue.getCurrentSize()); - assertFalse(testThread.isComplete()); //test that it is blocked - safeSleep(1000); - assertFalse(testThread.isComplete()); //still blocked - queue.take(); - finished.await(); - assertEquals(queueSize, queue.size()); - assertEquals(queueSize, queue.getCurrentSize()); - assertTrue(testThread.isComplete()); - executor.shutdownNow(); - executor.awaitTermination(500, TimeUnit.MILLISECONDS); - } - - /** - * Test that queue will block on Take when queue is empty - * @throws InterruptedException - */ - @Test - public void testBlockOnEmptyQueue() throws InterruptedException { - int queueSize = randomIntBetween(1, 3000); - ExecutorService executor = Executors.newSingleThreadExecutor(); - CountDownLatch empty = new CountDownLatch(1); - CountDownLatch finished = new CountDownLatch(1); - ThroughputQueue queue = new ThroughputQueue(); - BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue); - for(int i=0; i < queueSize; ++i) { - queue.put(i); - } - executor.submit(testThread); - empty.await(); - assertEquals(0, queue.size()); - assertEquals(0, queue.getCurrentSize()); - assertFalse(testThread.isComplete()); - safeSleep(1000); - assertFalse(testThread.isComplete()); - queue.put(1); - finished.await(); - assertEquals(0, queue.size()); - assertEquals(0, queue.getCurrentSize()); - assertTrue(testThread.isComplete()); - executor.shutdownNow(); - executor.awaitTermination(500, TimeUnit.MILLISECONDS); - } - - - /** - * Test multiple threads putting and taking from the queue while - * this thread repeatedly calls the MXBean measurement methods. - * Should hammer the queue with request from multiple threads - * of all request types. Purpose is to expose current modification exceptions - * and/or dead locks. - */ - @Test - @Repeat(iterations = 3) - public void testMultiThreadAccessAndInteruptResponse() throws Exception { - int putTakeThreadCount = randomIntBetween(1, 10); - int dataCount = randomIntBetween(1, 2000000); - int pollCount = randomIntBetween(1, 2000000); - int maxSize = randomIntBetween(1, 1000); - CountDownLatch finished = new CountDownLatch(putTakeThreadCount); - ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID); - ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2); - for(int i=0; i < putTakeThreadCount; ++i) { - executor.submit(new PutData(finished, queue, dataCount)); - executor.submit(new TakeData(queue)); - } - for(int i=0; i < pollCount; ++i) { - queue.getAvgWait(); - queue.getAdded(); - queue.getCurrentSize(); - queue.getMaxWait(); - queue.getRemoved(); - queue.getThroughput(); - } - finished.await(); - while(!queue.isEmpty()) { - LOGGER.info("Waiting for queue to be emptied..."); - safeSleep(500); - } - long totalData = ((long) dataCount) * putTakeThreadCount; - assertEquals(totalData, queue.getAdded()); - assertEquals(totalData, queue.getRemoved()); - executor.shutdown(); - executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts - executor.shutdownNow(); - executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes - //Randomized should not report thread leak - } - - - - private void safeSleep(long sleep) { - try { - Thread.sleep(sleep); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } - - - - - /** - * Helper runnable for test {@link ThroughputQueueMulitThreadTest#testBlockOnFullQueue()} - */ - private class BlocksOnFullQueue implements Runnable { - - private CountDownLatch full; - volatile private boolean complete; - private int queueSize; - private CountDownLatch finished; - private BlockingQueue queue; - - public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) { - this.full = latch; - this.complete = false; - this.queueSize = queueSize; - this.finished = finished; - this.queue = queue; - } - - @Override - public void run() { - try { - for (int i = 0; i < this.queueSize; ++i) { - this.queue.put(i); - } - this.full.countDown(); - this.queue.put(0); - this.complete = true; - this.finished.countDown(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } - - public boolean isComplete() { - return this.complete; - } - } - - - /** - * Helper runnable class for test {@link ThroughputQueueMulitThreadTest#testBlockOnEmptyQueue()} - */ - private class BlocksOnEmptyQueue implements Runnable { - - private CountDownLatch full; - volatile private boolean complete; - private int queueSize; - private CountDownLatch finished; - private BlockingQueue queue; - - public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) { - this.full = full; - this.finished = finished; - this.queueSize = queueSize; - this.queue = queue; - this.complete = false; - } - - - @Override - public void run() { - try { - for(int i=0; i < this.queueSize; ++i) { - this.queue.take(); - } - this.full.countDown(); - this.queue.take(); - this.complete = true; - this.finished.countDown(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } - - public boolean isComplete() { - return this.complete; - } - } - - - private class PutData implements Runnable { - - private BlockingQueue queue; - private int dataCount; - private CountDownLatch finished; - - public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) { - this.queue = queue; - this.dataCount = dataCount; - this.finished = finished; - } - - - @Override - public void run() { - try { - for(int i=0; i < this.dataCount; ++i) { - this.queue.put(i); - } - } catch (InterruptedException ie) { - LOGGER.error("PUT DATA interupted !"); - Thread.currentThread().interrupt(); - } - this.finished.countDown(); - } - } - - - private class TakeData implements Runnable { - - private BlockingQueue queue; - - public TakeData(BlockingQueue queue) { - this.queue = queue; - } - - - @Override - public void run() { - try { - while(true) { - this.queue.take(); - } - } catch (InterruptedException ie) { - LOGGER.error("PUT DATA interupted !"); - Thread.currentThread().interrupt(); - } - } - } - -}
