http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java index 64cc0e8..71447cb 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java @@ -19,8 +19,9 @@ package org.apache.streams.sysomos.provider; -import com.sysomos.xml.BeatApi; import org.apache.streams.core.StreamsDatum; + +import com.sysomos.xml.BeatApi; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,208 +31,228 @@ import org.slf4j.LoggerFactory; */ public class SysomosHeartbeatStream implements Runnable { - private static enum OperatingMode { DATE, DOC_MATCH} - - private final static Logger LOGGER = LoggerFactory.getLogger(SysomosHeartbeatStream.class); - - private final SysomosProvider provider; - private final SysomosClient client; - private final String heartbeatId; - private final long maxApiBatch; - private final long minLatency; - private final OperatingMode mode; - - private String lastID; - private DateTime beforeTime; - private DateTime afterTime; - private DateTime lastRunTime; - private int offsetCount = 0; - private boolean enabled = true; - - public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) { - this(provider, heartbeatId, null, DateTime.now()); + private enum OperatingMode { DATE, DOC_MATCH } + + private static final Logger LOGGER = LoggerFactory.getLogger(SysomosHeartbeatStream.class); + + private final SysomosProvider provider; + private final SysomosClient client; + private final String heartbeatId; + private final long maxApiBatch; + private final long minLatency; + private final OperatingMode mode; + + private String lastId; + private DateTime beforeTime; + private DateTime afterTime; + private DateTime lastRunTime; + private int offsetCount = 0; + private boolean enabled = true; + + public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) { + this(provider, heartbeatId, null, DateTime.now()); + } + + /** + * SysomosHeartbeatStream constructor. + * @param provider SysomosProvider + * @param heartbeatId heartbeatId + * @param beforeTime DateTime + * @param afterTime DateTime + */ + public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, DateTime beforeTime, DateTime afterTime) { + this(provider, heartbeatId, OperatingMode.DATE); + this.beforeTime = beforeTime; + this.afterTime = afterTime; + } + + /** + * SysomosHeartbeatStream constructor. + * @param provider SysomosProvider + * @param heartbeatId heartbeatId + * @param documentId last documentId + */ + public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, String documentId) { + this(provider, heartbeatId, OperatingMode.DOC_MATCH); + this.lastId = documentId; + } + + /** + * SysomosHeartbeatStream constructor. + * @param provider SysomosProvider + * @param heartbeatId heartbeatId + * @param mode OperatingMode + */ + public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, OperatingMode mode) { + this.provider = provider; + this.heartbeatId = heartbeatId; + + this.client = provider.getClient(); + this.maxApiBatch = provider.getMaxApiBatch(); + this.minLatency = provider.getMinLatency(); + this.mode = mode; + } + + @Override + public void run() { + try { + executeRun(); + } catch (Exception ex) { + LOGGER.error("Error executing heartbeat stream", ex); + shutdown(); } - - public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, DateTime beforeTime, DateTime afterTime) { - this(provider, heartbeatId, OperatingMode.DATE); - this.beforeTime = beforeTime; - this.afterTime = afterTime; + } + + protected void executeRun() { + QueryResult result; + String mostCurrentId = null; + int totalDocCount = 0; + lastRunTime = DateTime.now(); + //Iff we are trying to get to a specific document ID, continue to query after minimum delay + do { + LOGGER.debug("Querying API to match last ID of {} or time range of {} - {}", lastId, afterTime, beforeTime); + result = queryApi(); + totalDocCount += result.getResponseSize(); + //Ensure that we are only assigning lastId to the latest ID, even if there is backfill query. + //Since offset is calcuated at the end of the run, if we detect the need to backfill, it will increment to 1 + if (offsetCount == 1) { + mostCurrentId = result.getCurrentId(); + } + updateOffset(result); } - - public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, String documentId) { - this(provider, heartbeatId, OperatingMode.DOC_MATCH); - this.lastID = documentId; + while (offsetCount > 0); + + updateState(result, mostCurrentId, totalDocCount); + LOGGER.debug("Completed current execution with a final docID of {} or time of {}", lastId, afterTime); + } + + protected void updateState(QueryResult result, String mostCurrentId, int totalDocCount) { + if (OperatingMode.DOC_MATCH.equals(mode)) { + //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't + //found the specific ID + lastId = mostCurrentId == null ? result.getCurrentId() : mostCurrentId; + } else { + //If we didn't see any docs, there might be a lag on the Sysomos side. Retry. + afterTime = totalDocCount == 0 ? afterTime : lastRunTime; } - public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, OperatingMode mode) { - this.provider = provider; - this.heartbeatId = heartbeatId; - - this.client = provider.getClient(); - this.maxApiBatch = provider.getMaxApiBatch(); - this.minLatency = provider.getMinLatency(); - this.mode = mode; + if (SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) { + shutdown(); + LOGGER.info("Completed backfill to {} for heartbeat {}", OperatingMode.DOC_MATCH.equals(mode) ? lastId : afterTime, heartbeatId); } - - @Override - public void run() { - try { - executeRun(); - } catch (Exception e) { - LOGGER.error("Error executing heartbeat stream", e); - shutdown(); - } + } + + protected void updateOffset(QueryResult result) { + if (OperatingMode.DOC_MATCH.equals(mode)) { + //Reset the offset iff we have found a match or this is the first execution + offsetCount = lastId == null || result.isMatchedLastId() ? 0 : offsetCount + 1; + } else { + offsetCount = result.getResponseSize() == 0 ? 0 : offsetCount + 1; } - - protected void executeRun() { - QueryResult result; - String mostCurrentId = null; - int totalDocCount = 0; - lastRunTime = DateTime.now(); - //Iff we are trying to get to a specific document ID, continue to query after minimum delay - do { - LOGGER.debug("Querying API to match last ID of {} or time range of {} - {}", lastID, afterTime, beforeTime); - result = queryAPI(); - totalDocCount += result.getResponseSize(); - //Ensure that we are only assigning lastID to the latest ID, even if there is backfill query. - //Since offset is calcuated at the end of the run, if we detect the need to backfill, it will increment to 1 - if(offsetCount == 1) { - mostCurrentId = result.getCurrentId(); - } - updateOffset(result); - } while (offsetCount > 0); - - updateState(result, mostCurrentId, totalDocCount); - LOGGER.debug("Completed current execution with a final docID of {} or time of {}", lastID, afterTime); - } - - protected void updateState(QueryResult result, String mostCurrentId, int totalDocCount) { - if(OperatingMode.DOC_MATCH.equals(mode)) { - //Set the last ID so that the next time we are executed we will continue to query only so long as we haven't - //found the specific ID - lastID = mostCurrentId == null ? result.getCurrentId() : mostCurrentId; - } else { - //If we didn't see any docs, there might be a lag on the Sysomos side. Retry. - afterTime = totalDocCount == 0 ? afterTime : lastRunTime; - } - - if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) { - shutdown(); - LOGGER.info("Completed backfill to {} for heartbeat {}", OperatingMode.DOC_MATCH.equals(mode) ? lastID : afterTime, heartbeatId); - } + if (offsetCount > 0) { + sleep(); } - - protected void updateOffset(QueryResult result) { - if(OperatingMode.DOC_MATCH.equals(mode)) { - //Reset the offset iff we have found a match or this is the first execution - offsetCount = lastID == null || result.isMatchedLastId() ? 0 : offsetCount + 1; - } else { - offsetCount = result.getResponseSize() == 0 ? 0 : offsetCount + 1; - } - if(offsetCount > 0) { - sleep(); - } + } + + protected void sleep() { + try { + Thread.sleep(this.minLatency); + } catch (InterruptedException ex) { + LOGGER.warn("Thread interrupted while sleeping minimum delay", ex); + shutdown(); } - - protected void sleep() { - try { - Thread.sleep(this.minLatency); - } catch (InterruptedException e) { - LOGGER.warn("Thread interrupted while sleeping minimum delay", e); - shutdown(); + } + + protected QueryResult queryApi() { + BeatApi.BeatResponse response = executeApiRequest(); + + String currentId = null; + boolean matched = false; + int responseSize = 0; + if (response != null) { + for (BeatApi.BeatResponse.Beat beat : response.getBeat()) { + String docId = beat.getDocid(); + //We get documents in descending time order. This will set the id to the latest document + if (currentId == null) { + currentId = docId; + } + //We only want to process documents that we know we have not seen before + if (lastId != null && lastId.equals(docId)) { + matched = true; + break; } + StreamsDatum item = new StreamsDatum(beat, docId); + item.getMetadata().put("heartbeat", this.heartbeatId); + this.provider.enqueueItem(item); + } + responseSize = response.getCount(); } - - protected QueryResult queryAPI() { - BeatApi.BeatResponse response = executeAPIRequest(); - - String currentId = null; - boolean matched = false; - int responseSize = 0; - if(response != null) { - for (BeatApi.BeatResponse.Beat beat : response.getBeat()) { - String docId = beat.getDocid(); - //We get documents in descending time order. This will set the id to the latest document - if (currentId == null) { - currentId = docId; - } - //We only want to process documents that we know we have not seen before - if (lastID != null && lastID.equals(docId)) { - matched = true; - break; - } - StreamsDatum item = new StreamsDatum(beat, docId); - item.getMetadata().put("heartbeat", this.heartbeatId); - this.provider.enqueueItem(item); - } - responseSize = response.getCount(); + return new QueryResult(matched, currentId, responseSize); + } + + protected BeatApi.BeatResponse executeApiRequest() { + BeatApi.BeatResponse response = null; + try { + if (enabled) { + RequestBuilder requestBuilder = this.client.createRequestBuilder() + .setHeartBeatId(heartbeatId) + .setOffset(offsetCount * maxApiBatch) + .setReturnSetSize(maxApiBatch); + if (beforeTime != null) { + requestBuilder.setAddedBeforeDate(beforeTime); } - return new QueryResult(matched, currentId, responseSize); - } - - protected BeatApi.BeatResponse executeAPIRequest() { - BeatApi.BeatResponse response = null; - try { - if(enabled) { - RequestBuilder requestBuilder = this.client.createRequestBuilder() - .setHeartBeatId(heartbeatId) - .setOffset(offsetCount * maxApiBatch) - .setReturnSetSize(maxApiBatch); - if(beforeTime != null) { - requestBuilder.setAddedBeforeDate(beforeTime); - } - if(afterTime != null) { - requestBuilder.setAddedAfterDate(afterTime); - } - response = requestBuilder.execute(); - - LOGGER.debug("Received {} results from API query", response.getCount()); - } - } catch (Exception e) { - LOGGER.warn("Error querying Sysomos API", e); + if (afterTime != null) { + requestBuilder.setAddedAfterDate(afterTime); } - return response; - } + response = requestBuilder.execute(); - protected void shutdown() { - provider.signalComplete(heartbeatId); - enabled = false; + LOGGER.debug("Received {} results from API query", response.getCount()); + } + } catch (Exception ex) { + LOGGER.warn("Error querying Sysomos API", ex); } + return response; + } - protected class QueryResult { - private boolean matchedLastId; - private String currentId; - private int responseSize; + protected void shutdown() { + provider.signalComplete(heartbeatId); + enabled = false; + } + protected class QueryResult { + private boolean matchedLastId; + private String currentId; + private int responseSize; - public QueryResult(boolean matchedLastId, String currentId, int responseSize) { - this.matchedLastId = matchedLastId; - this.currentId = currentId; - this.responseSize = responseSize; - } - public boolean isMatchedLastId() { - return matchedLastId; - } + public QueryResult(boolean matchedLastId, String currentId, int responseSize) { + this.matchedLastId = matchedLastId; + this.currentId = currentId; + this.responseSize = responseSize; + } - public void setMatchedLastId(boolean matchedLastId) { - this.matchedLastId = matchedLastId; - } + public boolean isMatchedLastId() { + return matchedLastId; + } - public String getCurrentId() { - return currentId; - } + public void setMatchedLastId(boolean matchedLastId) { + this.matchedLastId = matchedLastId; + } - public void setCurrentId(String currentId) { - this.currentId = currentId; - } + public String getCurrentId() { + return currentId; + } - public int getResponseSize() { - return responseSize; - } + public void setCurrentId(String currentId) { + this.currentId = currentId; + } - public void setResponseSize(int responseSize) { - this.responseSize = responseSize; - } + public int getResponseSize() { + return responseSize; + } + + public void setResponseSize(int responseSize) { + this.responseSize = responseSize; } + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java index 824ede2..ec1f317 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java @@ -19,6 +19,15 @@ package org.apache.streams.sysomos.provider; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.data.util.RFC3339Utils; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; @@ -31,14 +40,6 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.data.util.RFC3339Utils; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +53,9 @@ import java.util.Iterator; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -60,310 +63,336 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Streams Provider for the Sysomos Heartbeat API * + * <p/> * Configuration: - * The provider takes either a Map<String,Object> containing the mode (backfill and terminate OR continuous) and a - * Map<String,String> of heartbeat IDs to document target ids or a string of the format ${heartbeatId}:${documentId},...,${heartbeatId}:${documentId} + * The provider takes either a Map[String,Object] containing the mode (backfill and terminate OR continuous) and a + * Map[String,String] of heartbeat IDs to document target ids or a string of the format + * ${heartbeatId}:${documentId},...,${heartbeatId}:${documentId} * This configuration will configure the provider to backfill to the specified document and either terminate or not * depending on the mode flag. Continuous mode is assumed, and is the ony mode supported by the String configuration. * - * To use from command line: - * - * Supply configuration similar to src/test/resources/rss.conf - * - * Launch using: - * - * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json" */ public class SysomosProvider implements StreamsProvider { - public static final String STREAMS_ID = "SysomosProvider"; - - public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE } - - private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class); - - public static final String ENDING_TIME_KEY = "addedBefore"; - public static final String STARTING_TIME_KEY = "addedAfter"; - public static final String MODE_KEY = "mode"; - public static final String STARTING_DOCS_KEY = "startingDocs"; - public static final int LATENCY = 10000; //Default minLatency for querying the Sysomos API in milliseconds - public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue - public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request - - protected volatile Queue<StreamsDatum> providerQueue; - - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final Set<String> completedHeartbeats = Sets.newHashSet(); - private final long maxQueued; - private final long minLatency; - private final long scheduledLatency; - private final long maxApiBatch; - - private SysomosClient client; - private SysomosConfiguration config; - private ScheduledExecutorService stream; - private Map<String, String> documentIds; - private Map<String, String> addedBefore; - private Map<String, String> addedAfter; - private Mode mode = Mode.CONTINUOUS; - private boolean started = false; - private AtomicInteger count; - - public SysomosProvider(SysomosConfiguration sysomosConfiguration) { - this.config = sysomosConfiguration; - this.client = new SysomosClient(sysomosConfiguration.getApiKey()); - this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize(); - this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? LATENCY : sysomosConfiguration.getMinDelayMs(); - this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == null ? (LATENCY * 15) : sysomosConfiguration.getScheduledDelayMs(); - this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize(); - this.count = new AtomicInteger(); - } - - public SysomosConfiguration getConfig() { - return config; - } - - public void setConfig(SysomosConfiguration config) { - this.config = config; - } - - public Mode getMode() { - return mode; - } - - public long getMinLatency() { - return minLatency; - } - - public long getMaxApiBatch() { - return maxApiBatch; + public static final String STREAMS_ID = "SysomosProvider"; + + public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE } + + private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class); + + public static final String ENDING_TIME_KEY = "addedBefore"; + public static final String STARTING_TIME_KEY = "addedAfter"; + public static final String MODE_KEY = "mode"; + public static final String STARTING_DOCS_KEY = "startingDocs"; + public static final int LATENCY = 10000; //Default minLatency for querying the Sysomos API in milliseconds + public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum size of the queue + public static final long API_BATCH_SIZE = 1000L; //Default maximum size of an API request + + protected volatile Queue<StreamsDatum> providerQueue; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Set<String> completedHeartbeats = Sets.newHashSet(); + private final long maxQueued; + private final long minLatency; + private final long scheduledLatency; + private final long maxApiBatch; + + private SysomosClient client; + private SysomosConfiguration config; + private ScheduledExecutorService stream; + private Map<String, String> documentIds; + private Map<String, String> addedBefore; + private Map<String, String> addedAfter; + private Mode mode = Mode.CONTINUOUS; + private boolean started = false; + private AtomicInteger count; + + /** + * SysomosProvider constructor. + * @param sysomosConfiguration SysomosConfiguration + */ + public SysomosProvider(SysomosConfiguration sysomosConfiguration) { + this.config = sysomosConfiguration; + this.client = new SysomosClient(sysomosConfiguration.getApiKey()); + this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize(); + this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? LATENCY : sysomosConfiguration.getMinDelayMs(); + this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == null + ? (LATENCY * 15) : sysomosConfiguration.getScheduledDelayMs(); + this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize(); + this.count = new AtomicInteger(); + } + + public SysomosConfiguration getConfig() { + return config; + } + + public void setConfig(SysomosConfiguration config) { + this.config = config; + } + + public Mode getMode() { + return mode; + } + + public long getMinLatency() { + return minLatency; + } + + public long getMaxApiBatch() { + return maxApiBatch; + } + + public SysomosClient getClient() { + return client; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void startStream() { + LOGGER.trace("Starting Producer"); + if (!started) { + LOGGER.trace("Producer not started. Initializing"); + stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1); + for (String heartbeatId : getConfig().getHeartbeatIds()) { + Runnable task = createStream(heartbeatId); + stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS); + LOGGER.info("Started producer task for heartbeat {}", heartbeatId); + } + started = true; } - - public SysomosClient getClient() { - return client; + } + + @Override + public StreamsResultSet readCurrent() { + StreamsResultSet current; + try { + lock.writeLock().lock(); + LOGGER.debug("Creating new result set for {} items", providerQueue.size()); + count.addAndGet(providerQueue.size()); + current = new StreamsResultSet(providerQueue); + providerQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); } - @Override - public String getId() { - return STREAMS_ID; + return current; + } + + @Override + public StreamsResultSet readNew(BigInteger bigInteger) { + throw new NotImplementedException("readNew not currently implemented"); + } + + @Override + public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { + throw new NotImplementedException("readRange not currently implemented"); + } + + //If the provider queue still has data, we are still running. If not, we are running if we have not been signaled + //by all completed heartbeats so long as the thread pool is alive + @Override + public boolean isRunning() { + return providerQueue.size() > 0 + || (completedHeartbeats.size() < this.getConfig().getHeartbeatIds().size() + && !(stream.isTerminated() + || stream.isShutdown())); + } + + @Override + public void prepare(Object configurationObject) { + this.providerQueue = constructQueue(); + if (configurationObject instanceof Map) { + extractConfigFromMap((Map) configurationObject); + } else if (configurationObject instanceof String) { + documentIds = Splitter.on(";").trimResults().withKeyValueSeparator("=").split((String)configurationObject); } - - @Override - public void startStream() { - LOGGER.trace("Starting Producer"); - if (!started) { - LOGGER.trace("Producer not started. Initializing"); - stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1); - for (String heartbeatId : getConfig().getHeartbeatIds()) { - Runnable task = createStream(heartbeatId); - stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, TimeUnit.MILLISECONDS); - LOGGER.info("Started producer task for heartbeat {}", heartbeatId); - } - started = true; + } + + @Override + public void cleanUp() { + stream.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!stream.awaitTermination(60, TimeUnit.SECONDS)) { + stream.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!stream.awaitTermination(60, TimeUnit.SECONDS)) { + LOGGER.error("Stream did not terminate"); } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + stream.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); } - - @Override - public StreamsResultSet readCurrent() { - StreamsResultSet current; - try { - lock.writeLock().lock(); - LOGGER.debug("Creating new result set for {} items", providerQueue.size()); - count.addAndGet(providerQueue.size()); - current = new StreamsResultSet(providerQueue); - providerQueue = constructQueue(); - } finally { - lock.writeLock().unlock(); - } - - return current; + } + + /** + * signalComplete. + * @param heartbeatId heartbeatId + */ + public void signalComplete(String heartbeatId) { + try { + this.lock.writeLock().lock(); + this.completedHeartbeats.add(heartbeatId); + if (!this.isRunning()) { + this.cleanUp(); + } + } finally { + this.lock.writeLock().unlock(); } - @Override - public StreamsResultSet readNew(BigInteger bigInteger) { - throw new NotImplementedException("readNew not currently implemented"); + } + + protected void enqueueItem(StreamsDatum datum) { + boolean success; + do { + try { + pauseForSpace(); //Dont lock before this pause. We don't want to block the readCurrent method + lock.readLock().lock(); + success = providerQueue.offer(datum); + Thread.yield(); + } finally { + lock.readLock().unlock(); + } } + while (!success); + } - @Override - public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { - throw new NotImplementedException("readRange not currently implemented"); - } + protected SysomosHeartbeatStream createStream(String heartbeatId) { + String afterTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null; + String beforeTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null; - //If the provider queue still has data, we are still running. If not, we are running if we have not been signaled - //by all completed heartbeats so long as the thread pool is alive - @Override - public boolean isRunning() { - return providerQueue.size() > 0 || (completedHeartbeats.size() < this.getConfig().getHeartbeatIds().size() && !(stream.isTerminated() || stream.isShutdown())); + if (documentIds != null && documentIds.containsKey(heartbeatId)) { + return new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId)); } - - @Override - public void prepare(Object configurationObject) { - this.providerQueue = constructQueue(); - if(configurationObject instanceof Map) { - extractConfigFromMap((Map) configurationObject); - } else if(configurationObject instanceof String) { - documentIds = Splitter.on(";").trimResults().withKeyValueSeparator("=").split((String)configurationObject); - } + if (afterTime != null) { + if (beforeTime != null) { + return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime)); + } else { + return new SysomosHeartbeatStream(this, heartbeatId, null, RFC3339Utils.parseToUTC(afterTime)); + } } - - @Override - public void cleanUp() { - stream.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!stream.awaitTermination(60, TimeUnit.SECONDS)) { - stream.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!stream.awaitTermination(60, TimeUnit.SECONDS)) { - LOGGER.error("Stream did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - stream.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } + return new SysomosHeartbeatStream(this, heartbeatId); + } + + /** + * Wait for the queue size to be below threshold before allowing execution to continue on this thread. + */ + protected void pauseForSpace() { + while (this.providerQueue.size() >= maxQueued) { + LOGGER.trace("Sleeping the current thread due to a full queue"); + try { + Thread.sleep(100); + LOGGER.trace("Resuming thread after wait period"); + } catch (InterruptedException ex) { + LOGGER.warn("Thread was interrupted", ex); + } } - - public void signalComplete(String heartbeatId) { - try { - this.lock.writeLock().lock(); - this.completedHeartbeats.add(heartbeatId); - if(!this.isRunning()) { - this.cleanUp(); - } - } finally { - this.lock.writeLock().unlock(); - } - + } + + @SuppressWarnings("unchecked") + protected void extractConfigFromMap(Map configMap) { + if (configMap.containsKey(MODE_KEY)) { + Object configMode = configMap.get(MODE_KEY); + if (!(configMode instanceof Mode)) { + throw new IllegalStateException("Invalid configuration. Mode must be an instance of the Mode enum but was " + configMode); + } + this.mode = (Mode)configMode; } - - protected void enqueueItem(StreamsDatum datum) { - boolean success; - do { - try { - pauseForSpace(); //Dont lock before this pause. We don't want to block the readCurrent method - lock.readLock().lock(); - success = providerQueue.offer(datum); - Thread.yield(); - }finally { - lock.readLock().unlock(); - } - } - while (!success); + if (configMap.containsKey(STARTING_DOCS_KEY)) { + Object configIds = configMap.get(STARTING_DOCS_KEY); + if (!(configIds instanceof Map)) { + throw new IllegalStateException("Invalid configuration. StartingDocs must be an instance of Map<String,String> but was " + + configIds); + } + this.documentIds = (Map)configIds; } - - protected SysomosHeartbeatStream createStream(String heartbeatId) { - String afterTime = addedAfter != null && addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null; - String beforeTime = addedBefore != null && addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null; - - if(documentIds != null && documentIds.containsKey(heartbeatId)) { - return new SysomosHeartbeatStream(this, heartbeatId, documentIds.get(heartbeatId)); - } - if(afterTime != null) { - if(beforeTime != null) { - return new SysomosHeartbeatStream(this, heartbeatId, RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime)); - } else { - return new SysomosHeartbeatStream(this, heartbeatId, null, RFC3339Utils.parseToUTC(afterTime)); - } - } - return new SysomosHeartbeatStream(this, heartbeatId); + if (configMap.containsKey(STARTING_TIME_KEY)) { + Object configIds = configMap.get(STARTING_TIME_KEY); + if (!(configIds instanceof Map)) { + throw new IllegalStateException("Invalid configuration. Added after key must be an instance of Map<String,String> but was " + + configIds); + } + this.addedAfter = (Map)configIds; } - - /** - * Wait for the queue size to be below threshold before allowing execution to continue on this thread - */ - protected void pauseForSpace() { - while(this.providerQueue.size() >= maxQueued) { - LOGGER.trace("Sleeping the current thread due to a full queue"); - try { - Thread.sleep(100); - LOGGER.trace("Resuming thread after wait period"); - } catch (InterruptedException e) { - LOGGER.warn("Thread was interrupted", e); - } - } + if (configMap.containsKey(ENDING_TIME_KEY)) { + Object configIds = configMap.get(ENDING_TIME_KEY); + if (!(configIds instanceof Map)) { + throw new IllegalStateException("Invalid configuration. Added before key must be an instance of Map<String,String> but was " + + configIds); + } + this.addedBefore = (Map)configIds; } - - @SuppressWarnings("unchecked") - protected void extractConfigFromMap(Map configMap) { - if(configMap.containsKey(MODE_KEY)) { - Object configMode = configMap.get(MODE_KEY); - if(!(configMode instanceof Mode)) { - throw new IllegalStateException("Invalid configuration. Mode must be an instance of the Mode enum but was " + configMode); - } - this.mode = (Mode)configMode; - } - if(configMap.containsKey(STARTING_DOCS_KEY)) { - Object configIds = configMap.get(STARTING_DOCS_KEY); - if(!(configIds instanceof Map)) { - throw new IllegalStateException("Invalid configuration. StartingDocs must be an instance of Map<String,String> but was " + configIds); - } - this.documentIds = (Map)configIds; - } - if(configMap.containsKey(STARTING_TIME_KEY)) { - Object configIds = configMap.get(STARTING_TIME_KEY); - if(!(configIds instanceof Map)) { - throw new IllegalStateException("Invalid configuration. Added after key must be an instance of Map<String,String> but was " + configIds); - } - this.addedAfter = (Map)configIds; - } - if(configMap.containsKey(ENDING_TIME_KEY)) { - Object configIds = configMap.get(ENDING_TIME_KEY); - if(!(configIds instanceof Map)) { - throw new IllegalStateException("Invalid configuration. Added before key must be an instance of Map<String,String> but was " + configIds); - } - this.addedBefore = (Map)configIds; + } + + private Queue<StreamsDatum> constructQueue() { + return Queues.newConcurrentLinkedQueue(); + } + + public int getCount() { + return this.count.get(); + } + + /** + * To use from command line: + * + * <p/> + * Supply configuration similar to src/test/resources/rss.conf + * + * <p/> + * Launch using: + * + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json" + * + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + SysomosConfiguration config = new ComponentConfigurator<>(SysomosConfiguration.class).detectConfiguration(typesafe, "rss"); + SysomosProvider provider = new SysomosProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); } + } } - - private Queue<StreamsDatum> constructQueue() { - return Queues.newConcurrentLinkedQueue(); - } - - public int getCount() { - return this.count.get(); - } - - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - SysomosConfiguration config = new ComponentConfigurator<>(SysomosConfiguration.class).detectConfiguration(typesafe, "rss"); - SysomosProvider provider = new SysomosProvider(config); - - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException e) { - System.err.println(e.getMessage()); - } - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); - } + while ( provider.isRunning() ); + provider.cleanUp(); + outStream.flush(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java index 3b6a843..82d538d 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java @@ -19,9 +19,10 @@ package org.apache.streams.sysomos.util; +import org.apache.streams.sysomos.SysomosException; + import com.google.common.base.Strings; import org.apache.commons.io.IOUtils; -import org.apache.streams.sysomos.SysomosException; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; @@ -36,49 +37,53 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; /** - * Provides utilities for working with Sysomos + * Provides utilities for working with Sysomos. */ public class SysomosUtils { - public static final Pattern CODE_PATTERN = Pattern.compile("code: ([0-9]+)"); - public static final DateTimeFormatter SYSOMOS_DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZoneUTC(); - private final static Logger LOGGER = LoggerFactory.getLogger(SysomosUtils.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SysomosUtils.class); + + public static final Pattern CODE_PATTERN = Pattern.compile("code: ([0-9]+)"); + public static final DateTimeFormatter SYSOMOS_DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZoneUTC(); - private SysomosUtils() {} + private SysomosUtils() {} - /** - * Queries the sysomos URL and provides the response as a String - * - * @param url the Sysomos URL to query - * @return valid XML String - */ - public static String queryUrl(URL url) { - try { - HttpURLConnection cn = (HttpURLConnection) url.openConnection(); - cn.setRequestMethod("GET"); - cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8"); - cn.setDoInput(true); - cn.setDoOutput(false); - StringWriter writer = new StringWriter(); - IOUtils.copy(new InputStreamReader(cn.getInputStream()), writer); - writer.flush(); + /** + * Queries the sysomos URL and provides the response as a String. + * + * @param url the Sysomos URL to query + * @return valid XML String + */ + public static String queryUrl(URL url) { + try { + HttpURLConnection cn = (HttpURLConnection) url.openConnection(); + cn.setRequestMethod("GET"); + cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8"); + cn.setDoInput(true); + cn.setDoOutput(false); + StringWriter writer = new StringWriter(); + IOUtils.copy(new InputStreamReader(cn.getInputStream()), writer); + writer.flush(); - String xmlResponse = writer.toString(); - if (Strings.isNullOrEmpty(xmlResponse)) { - throw new SysomosException("XML Response from Sysomos was empty : " + xmlResponse + "\n" + cn.getResponseMessage(), cn.getResponseCode()); - } - return xmlResponse; - } catch (IOException e) { - LOGGER.error("Error executing request : {}", e, url.toString()); - String message = e.getMessage(); - Matcher match = CODE_PATTERN.matcher(message); - if(match.find()) { - int errorCode = Integer.parseInt(match.group(1)); - throw new SysomosException(message, e, errorCode); - } - else { - throw new SysomosException(e.getMessage(), e); - } - } + String xmlResponse = writer.toString(); + if (Strings.isNullOrEmpty(xmlResponse)) { + throw new SysomosException("XML Response from Sysomos was empty : " + + xmlResponse + + "\n" + + cn.getResponseMessage(), + cn.getResponseCode()); + } + return xmlResponse; + } catch (IOException ex) { + LOGGER.error("Error executing request : {}", ex, url.toString()); + String message = ex.getMessage(); + Matcher match = CODE_PATTERN.matcher(message); + if (match.find()) { + int errorCode = Integer.parseInt(match.group(1)); + throw new SysomosException(message, ex, errorCode); + } else { + throw new SysomosException(ex.getMessage(), ex); + } } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java b/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java index e3b4848..7efffcc 100644 --- a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java +++ b/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.sysomos.json.Sysomos; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,34 +35,34 @@ import java.io.InputStreamReader; */ public class SysomosJsonSerDeIT { - private final static Logger LOGGER = LoggerFactory.getLogger(SysomosJsonSerDeIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SysomosJsonSerDeIT.class); - private ObjectMapper mapper = new ObjectMapper(); + private ObjectMapper mapper = new ObjectMapper(); - @Test - public void Test() - { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + @Test + public void testSysomosJsonSerDe() { - InputStream is = SysomosJsonSerDeIT.class.getResourceAsStream("/sysomos_jsons.txt"); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - try { - while (br.ready()) { - String line = br.readLine(); - LOGGER.debug(line); + InputStream is = SysomosJsonSerDeIT.class.getResourceAsStream("/sysomos_jsons.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); - Sysomos ser = mapper.readValue(line, Sysomos.class); + try { + while (br.ready()) { + String line = br.readLine(); + LOGGER.debug(line); - String des = mapper.writeValueAsString(ser); - LOGGER.debug(des); - } - } catch( Exception e ) { - e.printStackTrace(); - Assert.fail(); - } + Sysomos ser = mapper.readValue(line, Sysomos.class); + + String des = mapper.writeValueAsString(ser); + LOGGER.debug(des); + } + } catch ( Exception ex ) { + ex.printStackTrace(); + Assert.fail(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java b/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java index b9ee2e1..e078d02 100644 --- a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java +++ b/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java @@ -27,7 +27,6 @@ import com.fasterxml.jackson.dataformat.xml.XmlMapper; import com.sysomos.xml.BeatApi; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,60 +40,63 @@ import java.io.InputStreamReader; */ public class SysomosXmlSerDeIT { - private final static Logger LOGGER = LoggerFactory.getLogger(SysomosXmlSerDeIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SysomosXmlSerDeIT.class); - private XmlMapper xmlMapper; + private XmlMapper xmlMapper; - @Before - public void Before() { + /** + * before. + */ + @Before + public void before() { - XmlFactory f = new XmlFactory(new InputFactoryImpl(), - new OutputFactoryImpl()); + XmlFactory xmlFactory = new XmlFactory(new InputFactoryImpl(), + new OutputFactoryImpl()); - JacksonXmlModule module = new JacksonXmlModule(); + JacksonXmlModule module = new JacksonXmlModule(); - module.setDefaultUseWrapper(false); + module.setDefaultUseWrapper(false); - xmlMapper = new XmlMapper(f, module); + xmlMapper = new XmlMapper(xmlFactory, module); - xmlMapper - .configure( - DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, - Boolean.TRUE); - xmlMapper - .configure( - DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, - Boolean.TRUE); - xmlMapper - .configure( - DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, - Boolean.TRUE); - xmlMapper.configure( - DeserializationFeature.READ_ENUMS_USING_TO_STRING, - Boolean.TRUE); + xmlMapper + .configure( + DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, + Boolean.TRUE); + xmlMapper + .configure( + DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, + Boolean.TRUE); + xmlMapper + .configure( + DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, + Boolean.TRUE); + xmlMapper.configure( + DeserializationFeature.READ_ENUMS_USING_TO_STRING, + Boolean.TRUE); - } + } + + @Test + public void test() { + + InputStream is = SysomosXmlSerDeIT.class.getResourceAsStream("/sysomos_xmls.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + + try { + while (br.ready()) { + String line = br.readLine(); + LOGGER.debug(line); + + BeatApi ser = xmlMapper.readValue(line, BeatApi.class); - @Test - public void Test() - { - InputStream is = SysomosXmlSerDeIT.class.getResourceAsStream("/sysomos_xmls.txt"); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - - try { - while (br.ready()) { - String line = br.readLine(); - LOGGER.debug(line); - - BeatApi ser = xmlMapper.readValue(line, BeatApi.class); - - String des = xmlMapper.writeValueAsString(ser); - LOGGER.debug(des); - } - } catch( Exception e ) { - e.printStackTrace(); - Assert.fail(); - } + String des = xmlMapper.writeValueAsString(ser); + LOGGER.debug(des); + } + } catch ( Exception ex ) { + ex.printStackTrace(); + Assert.fail(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java b/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java index b4289ee..a088726 100644 --- a/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java +++ b/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java @@ -18,63 +18,49 @@ package org.apache.streams.sysomos.test.provider; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import com.sysomos.SysomosConfiguration; -import org.apache.commons.lang.StringUtils; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.sysomos.provider.SysomosProvider; -import org.junit.Assert; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileOutputStream; import java.io.FileReader; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.LineNumberReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.util.List; /** - * Integration test for SysomosProviderIT - * - * Created by sblackmon on 10/21/16. + * Integration test for SysomosProviderIT. */ @Ignore("this is ignored because the project doesn't have credentials to test it with during CI") public class SysomosProviderIT { - private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProviderIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProviderIT.class); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - @Test - public void testRssStreamProvider() throws Exception { + @Test + public void testRssStreamProvider() throws Exception { - String configfile = "./target/test-classes/RssStreamProviderIT.conf"; - String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt"; + String configfile = "./target/test-classes/RssStreamProviderIT.conf"; + String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt"; - SysomosProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + SysomosProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() >= 1); + assert (outCounter.getLineNumber() >= 1); - } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/StreamsTwitterMapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/StreamsTwitterMapper.java index aca185c..671a830 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/StreamsTwitterMapper.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/StreamsTwitterMapper.java @@ -18,69 +18,87 @@ package org.apache.streams.twitter.converter; +import org.apache.streams.data.util.RFC3339Utils; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.converter.util.TwitterActivityUtil; + import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; import com.fasterxml.jackson.databind.module.SimpleModule; -import org.apache.streams.data.util.RFC3339Utils; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; /** * This class assist with handling twitter's date-time format during conversion * + * <p/> * Deprecated: use StreamsJacksonMapper.getInstance() with TwitterDateTimeFormat on the classpath instead */ @Deprecated public class StreamsTwitterMapper extends StreamsJacksonMapper { - public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy"; + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterActivityUtil.class); - public static final DateTimeFormatter TWITTER_FORMATTER = DateTimeFormat.forPattern(TWITTER_FORMAT); + public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy"; - public static Long getMillis(String dateTime) { + public static final DateTimeFormatter TWITTER_FORMATTER = DateTimeFormat.forPattern(TWITTER_FORMAT); - // this function is for pig which doesn't handle exceptions well - try { - return TWITTER_FORMATTER.parseMillis(dateTime); - } catch( Exception e ) { - return null; - } + /** + * Convert to millis with TWITTER_FORMATTER. + * @param dateTime dateTime as String + * @return millis as Long + */ + public static Long getMillis(String dateTime) { + // this function is for pig which doesn't handle exceptions well + try { + return TWITTER_FORMATTER.parseMillis(dateTime); + } catch ( Exception ex ) { + return null; } - private static final StreamsTwitterMapper INSTANCE = new StreamsTwitterMapper(); + } - public static StreamsTwitterMapper getInstance(){ - return INSTANCE; - } + private static final StreamsTwitterMapper INSTANCE = new StreamsTwitterMapper(); + + public static StreamsTwitterMapper getInstance() { + return INSTANCE; + } - public StreamsTwitterMapper() { - super(); - registerModule(new SimpleModule() - { - { - addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) { - @Override - public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException { - DateTime result = null; - try { - result = TWITTER_FORMATTER.parseDateTime(jpar.getValueAsString()); - } catch( Exception ignored ) { } - try { - result = RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString()); - } catch( Exception ignored ) { } - return result; - } - }); + /** + * StreamsTwitterMapper constructor. + */ + public StreamsTwitterMapper() { + super(); + registerModule(new SimpleModule() { + { + addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) { + @Override + public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException { + DateTime result = null; + try { + result = TWITTER_FORMATTER.parseDateTime(jpar.getValueAsString()); + } catch ( Exception ignored ) { + LOGGER.trace("ignored", ignored); } + try { + result = RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString()); + } catch ( Exception ignored ) { + LOGGER.trace("ignored", ignored); + } + return result; + } }); + } + }); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateTimeFormat.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateTimeFormat.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateTimeFormat.java index 5a34868..d8da2c1 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateTimeFormat.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateTimeFormat.java @@ -20,12 +20,15 @@ package org.apache.streams.twitter.converter; import org.apache.streams.jackson.StreamsDateTimeFormat; +/** + * TwitterDateTimeFormat. + */ public class TwitterDateTimeFormat implements StreamsDateTimeFormat { - public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy"; + public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy"; - @Override - public String getFormat() { - return TWITTER_FORMAT; - } + @Override + public String getFormat() { + return TWITTER_FORMAT; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java index 3c71f9a..f555e8d 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java @@ -18,10 +18,6 @@ package org.apache.streams.twitter.converter; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.streams.data.DocumentClassifier; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.pojo.Delete; @@ -32,6 +28,11 @@ import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.pojo.User; import org.apache.streams.twitter.pojo.UserstreamEvent; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -39,47 +40,53 @@ import java.util.List; import static org.apache.streams.twitter.converter.TwitterDateTimeFormat.TWITTER_FORMAT; /** - * Ensures twitter documents can be converted to Activity + * Ensures twitter documents can be converted to Activity. */ public class TwitterDocumentClassifier implements DocumentClassifier { - public List<Class> detectClasses(Object document) { + @Override + public List<Class> detectClasses(Object document) { - Preconditions.checkNotNull(document); + Preconditions.checkNotNull(document); - ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TWITTER_FORMAT)); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TWITTER_FORMAT)); - ObjectNode objectNode; - try { - if( document instanceof String ) - objectNode = mapper.readValue((String)document, ObjectNode.class); - else if( document instanceof ObjectNode ) - objectNode = (ObjectNode) document; - else - objectNode = mapper.convertValue(document, ObjectNode.class); - } catch (IOException e) { - return new ArrayList<>(); - } - - List<Class> classList = new ArrayList<>(); + ObjectNode objectNode; + try { + if ( document instanceof String ) { + objectNode = mapper.readValue((String) document, ObjectNode.class); + } else if ( document instanceof ObjectNode ) { + objectNode = (ObjectNode) document; + } else { + objectNode = mapper.convertValue(document, ObjectNode.class); + } + } catch (IOException ex) { + return new ArrayList<>(); + } - if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null) - classList.add(Retweet.class); - else if( objectNode.findValue("delete") != null ) - classList.add(Delete.class); - else if( objectNode.findValue("friends") != null || - objectNode.findValue("friends_str") != null ) - classList.add(FriendList.class); - else if( objectNode.findValue("target_object") != null ) - classList.add(UserstreamEvent.class); - else if( objectNode.findValue("follower") != null && objectNode.findValue("followee") != null) - classList.add(Follow.class); - else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null) - classList.add(User.class); - else - classList.add(Tweet.class); + List<Class> classList = new ArrayList<>(); - return classList; + if ( objectNode.findValue("retweeted_status") != null + && objectNode.get("retweeted_status") != null) { + classList.add(Retweet.class); + } else if ( objectNode.findValue("delete") != null ) { + classList.add(Delete.class); + } else if ( objectNode.findValue("friends") != null + || objectNode.findValue("friends_str") != null ) { + classList.add(FriendList.class); + } else if ( objectNode.findValue("target_object") != null ) { + classList.add(UserstreamEvent.class); + } else if ( objectNode.findValue("follower") != null + && objectNode.findValue("followee") != null) { + classList.add(Follow.class); + } else if ( objectNode.findValue("location") != null + && objectNode.findValue("user") == null) { + classList.add(User.class); + } else { + classList.add(Tweet.class); } + return classList; + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java index e0ed4a4..f34c14a 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java @@ -18,67 +18,69 @@ package org.apache.streams.twitter.converter; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivityConverter; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.pojo.json.Activity; -import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Provider; -import org.apache.streams.twitter.pojo.Follow; import org.apache.streams.twitter.converter.util.TwitterActivityUtil; +import org.apache.streams.twitter.pojo.Follow; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.NotImplementedException; import java.io.Serializable; import java.util.List; public class TwitterFollowActivityConverter implements ActivityConverter<Follow>, Serializable { - public TwitterFollowActivityConverter() { - } - - private static TwitterFollowActivityConverter instance = new TwitterFollowActivityConverter(); - - public static TwitterFollowActivityConverter getInstance() { - return instance; - } - - public static Class requiredClass = Follow.class; - - @Override - public Class requiredClass() { - return requiredClass; - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public Follow fromActivity(Activity deserialized) throws ActivityConversionException { - throw new NotImplementedException(); - } - - @Override - public List<Activity> toActivityList(Follow event) throws ActivityConversionException { - - Activity activity = new Activity(); - activity.setVerb("follow"); - activity.setActor(TwitterActivityUtil.buildActor(event.getFollower())); - activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee())); - activity.setId(activity.getActor().getId() + "-follow->" + activity.getObject().getId()); - activity.setProvider((Provider) new Provider().withId("twitter")); - return Lists.newArrayList(activity); - } - - @Override - public List<Follow> fromActivityList(List<Activity> list) { - throw new NotImplementedException(); - } - - @Override - public List<Activity> toActivityList(List<Follow> list) { - throw new NotImplementedException(); - } + public TwitterFollowActivityConverter() { + } + + private static TwitterFollowActivityConverter instance = new TwitterFollowActivityConverter(); + + public static TwitterFollowActivityConverter getInstance() { + return instance; + } + + public static Class requiredClass = Follow.class; + + @Override + public Class requiredClass() { + return requiredClass; + } + + @Override + public String serializationFormat() { + return null; + } + + @Override + public Follow fromActivity(Activity deserialized) throws ActivityConversionException { + throw new NotImplementedException(); + } + + @Override + public List<Follow> fromActivityList(List<Activity> list) { + throw new NotImplementedException(); + } + + @Override + public List<Activity> toActivityList(Follow event) throws ActivityConversionException { + + Activity activity = new Activity(); + activity.setVerb("follow"); + activity.setActor(TwitterActivityUtil.buildActor(event.getFollower())); + activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee())); + activity.setId(activity.getActor().getId() + "-follow->" + activity.getObject().getId()); + activity.setProvider((Provider) new Provider().withId("twitter")); + return Lists.newArrayList(activity); + } + + @Override + public List<Activity> toActivityList(List<Follow> list) { + throw new NotImplementedException(); + } + + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java index 3e61ef9..ac031b4 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java @@ -18,8 +18,6 @@ package org.apache.streams.twitter.converter; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivityConverter; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.pojo.json.Activity; @@ -27,63 +25,63 @@ import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Tweet; +import com.google.common.collect.Lists; +import org.apache.commons.lang.NotImplementedException; + import java.io.Serializable; import java.util.List; import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity; - /** -* Created with IntelliJ IDEA. -* User: mdelaet -* Date: 9/30/13 -* Time: 9:24 AM -* To change this template use File | Settings | File Templates. -*/ + * TwitterJsonDeleteActivityConverter. + */ +//TODO: use class explicitly somewhere public class TwitterJsonDeleteActivityConverter implements ActivityConverter<Delete>, Serializable { - public static Class requiredClass = Delete.class; + public static Class requiredClass = Delete.class; - @Override - public Class requiredClass() { - return requiredClass; - } + @Override + public Class requiredClass() { + return requiredClass; + } - private static TwitterJsonDeleteActivityConverter instance = new TwitterJsonDeleteActivityConverter(); + private static TwitterJsonDeleteActivityConverter instance = new TwitterJsonDeleteActivityConverter(); - public static TwitterJsonDeleteActivityConverter getInstance() { - return instance; - } + public static TwitterJsonDeleteActivityConverter getInstance() { + return instance; + } - @Override - public String serializationFormat() { - return null; - } + @Override + public String serializationFormat() { + return null; + } - @Override - public Delete fromActivity(Activity deserialized) throws ActivityConversionException { - throw new NotImplementedException(); - } + @Override + public Delete fromActivity(Activity deserialized) throws ActivityConversionException { + throw new NotImplementedException(); + } - @Override - public List<Activity> toActivityList(List<Delete> serializedList) { - throw new NotImplementedException(); - } + @Override + public List<Delete> fromActivityList(List<Activity> list) { + throw new NotImplementedException(); + } - public List<Activity> toActivityList(Delete delete) throws ActivityConversionException { + @Override + public List<Activity> toActivityList(List<Delete> serializedList) { + throw new NotImplementedException(); + } - Activity activity = new Activity(); - updateActivity(delete, activity); - return Lists.newArrayList(activity); - } + @Override + public List<Activity> toActivityList(Delete delete) throws ActivityConversionException { - @Override - public List<Delete> fromActivityList(List<Activity> list) { - throw new NotImplementedException(); - } + Activity activity = new Activity(); + updateActivity(delete, activity); + return Lists.newArrayList(activity); + } - public ActivityObject buildTarget(Tweet tweet) { - return null; - } + public ActivityObject buildTarget(Tweet tweet) { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java index 30a1916..13e2568 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java @@ -18,13 +18,14 @@ package org.apache.streams.twitter.converter; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivityConverter; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.Retweet; +import com.google.common.collect.Lists; +import org.apache.commons.lang.NotImplementedException; + import java.io.Serializable; import java.util.List; @@ -32,52 +33,54 @@ import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.upda public class TwitterJsonRetweetActivityConverter implements ActivityConverter<Retweet>, Serializable { - public static Class requiredClass = Retweet.class; - - @Override - public Class requiredClass() { - return requiredClass; - } - - private static TwitterJsonRetweetActivityConverter instance = new TwitterJsonRetweetActivityConverter(); - - public static TwitterJsonRetweetActivityConverter getInstance() { - return instance; - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public Retweet fromActivity(Activity deserialized) throws ActivityConversionException { - throw new NotImplementedException(); - } - - @Override - public List<Activity> toActivityList(Retweet retweet) throws ActivityConversionException { - - Activity activity = new Activity(); - updateActivity(retweet, activity); - - return Lists.newArrayList(activity); - } - - @Override - public List<Retweet> fromActivityList(List<Activity> list) { - throw new NotImplementedException(); - } - - @Override - public List<Activity> toActivityList(List<Retweet> serializedList) { - List<Activity> result = Lists.newArrayList(); - for( Retweet item : serializedList ) { - try { - List<Activity> activities = toActivityList(item); - result.addAll(activities); - } catch (ActivityConversionException e) {} - } - return result; + public static Class requiredClass = Retweet.class; + + @Override + public Class requiredClass() { + return requiredClass; + } + + private static TwitterJsonRetweetActivityConverter instance = new TwitterJsonRetweetActivityConverter(); + + public TwitterJsonRetweetActivityConverter getInstance() { + return instance; + } + + @Override + public String serializationFormat() { + return null; + } + + @Override + public Retweet fromActivity(Activity deserialized) throws ActivityConversionException { + throw new NotImplementedException(); + } + + @Override + public List<Retweet> fromActivityList(List<Activity> list) { + throw new NotImplementedException(); + } + + @Override + public List<Activity> toActivityList(Retweet retweet) throws ActivityConversionException { + + Activity activity = new Activity(); + updateActivity(retweet, activity); + + return Lists.newArrayList(activity); + } + + @Override + public List<Activity> toActivityList(List<Retweet> serializedList) { + List<Activity> result = Lists.newArrayList(); + for ( Retweet item : serializedList ) { + try { + List<Activity> activities = toActivityList(item); + result.addAll(activities); + } catch (ActivityConversionException ex) { + // + } } + return result; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java index 0997a7f..c3b5b15 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java @@ -18,13 +18,14 @@ package org.apache.streams.twitter.converter; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivityConverter; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.Tweet; +import com.google.common.collect.Lists; +import org.apache.commons.lang.NotImplementedException; + import java.io.Serializable; import java.util.List; @@ -32,53 +33,55 @@ import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.upda public class TwitterJsonTweetActivityConverter implements ActivityConverter<Tweet>, Serializable { - public static Class requiredClass = Tweet.class; + public static Class requiredClass = Tweet.class; - @Override - public Class requiredClass() { - return requiredClass; - } + @Override + public Class requiredClass() { + return requiredClass; + } - private static TwitterJsonTweetActivityConverter instance = new TwitterJsonTweetActivityConverter(); + private static TwitterJsonTweetActivityConverter instance = new TwitterJsonTweetActivityConverter(); - public static TwitterJsonTweetActivityConverter getInstance() { - return instance; - } + public static TwitterJsonTweetActivityConverter getInstance() { + return instance; + } - @Override - public String serializationFormat() { - return null; - } + @Override + public String serializationFormat() { + return null; + } - @Override - public Tweet fromActivity(Activity deserialized) throws ActivityConversionException { - throw new NotImplementedException(); - } + @Override + public Tweet fromActivity(Activity deserialized) throws ActivityConversionException { + throw new NotImplementedException(); + } - @Override - public List<Activity> toActivityList(Tweet tweet) throws ActivityConversionException { + @Override + public List<Tweet> fromActivityList(List<Activity> list) { + throw new NotImplementedException(); + } - Activity activity = new Activity(); + @Override + public List<Activity> toActivityList(Tweet tweet) throws ActivityConversionException { - updateActivity(tweet, activity); + Activity activity = new Activity(); - return Lists.newArrayList(activity); - } + updateActivity(tweet, activity); - @Override - public List<Tweet> fromActivityList(List<Activity> list) { - throw new NotImplementedException(); - } + return Lists.newArrayList(activity); + } - @Override - public List<Activity> toActivityList(List<Tweet> serializedList) { - List<Activity> result = Lists.newArrayList(); - for( Tweet item : serializedList ) { - try { - List<Activity> activities = toActivityList(item); - result.addAll(activities); - } catch (ActivityConversionException e) {} - } - return result; + @Override + public List<Activity> toActivityList(List<Tweet> serializedList) { + List<Activity> result = Lists.newArrayList(); + for ( Tweet item : serializedList ) { + try { + List<Activity> activities = toActivityList(item); + result.addAll(activities); + } catch (ActivityConversionException ex) { + // + } } + return result; + } }
