http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java index d7dc918..9de1863 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java @@ -18,14 +18,6 @@ package org.apache.streams.rss.provider; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; @@ -37,6 +29,15 @@ import org.apache.streams.rss.FeedDetails; import org.apache.streams.rss.RssStreamConfiguration; import org.apache.streams.rss.provider.perpetual.RssFeedScheduler; import org.apache.streams.util.ComponentUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,173 +62,159 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * RSS {@link org.apache.streams.core.StreamsProvider} that provides content from rss feeds in boilerpipe format - * - * To use from command line: - * - * Supply configuration similar to src/test/resources/rss.conf - * - * Launch using: - * - * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json" */ public class RssStreamProvider implements StreamsProvider { - public static final String STREAMS_ID = "RssStreamProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class); - - private final static int MAX_SIZE = 1000; - - private RssStreamConfiguration config; - private boolean perpetual; - private ExecutorService executor; - private BlockingQueue<StreamsDatum> dataQueue; - private AtomicBoolean isComplete; - - @VisibleForTesting - protected RssFeedScheduler scheduler; - - public RssStreamProvider() { - this(new ComponentConfigurator<>(RssStreamConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), false); - } - - public RssStreamProvider(boolean perpetual) { - this(new ComponentConfigurator<>(RssStreamConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), perpetual); - } - - public RssStreamProvider(RssStreamConfiguration config) { - this(config, false); - } - - public RssStreamProvider(RssStreamConfiguration config, boolean perpetual) { - this.perpetual = perpetual; - this.config = config; - } - - @Override - public String getId() { - return STREAMS_ID; - } - - public void setConfig(RssStreamConfiguration config) { - this.config = config; - } - - public void setRssFeeds(Set<String> urlFeeds) { - } - - public void setRssFeeds(Map<String, Long> feeds) { - if(this.config == null) { - this.config = new RssStreamConfiguration(); - } - List<FeedDetails> feedDetails = new ArrayList<>(); - for(String feed : feeds.keySet()) { - Long delay = feeds.get(feed); - FeedDetails detail = new FeedDetails(); - detail.setUrl(feed); - detail.setPollIntervalMillis(delay); - feedDetails.add(detail); - } - this.config.setFeeds(feedDetails); - } - - @Override - public void startStream() { - LOGGER.trace("Starting Rss Scheduler"); - this.executor.submit(this.scheduler); - } - - @Override - public StreamsResultSet readCurrent() { - Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>(); - int batchSize = 0; - while(!this.dataQueue.isEmpty() && batchSize < MAX_SIZE) { - StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.dataQueue); - if(datum != null) { - ++batchSize; - batch.add(datum); - } + public static final String STREAMS_ID = "RssStreamProvider"; + + private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class); + + private static final int MAX_SIZE = 1000; + + private RssStreamConfiguration config; + private boolean perpetual; + private ExecutorService executor; + private BlockingQueue<StreamsDatum> dataQueue; + private AtomicBoolean isComplete; + + @VisibleForTesting + protected RssFeedScheduler scheduler; + + public RssStreamProvider() { + this(new ComponentConfigurator<>(RssStreamConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), false); + } + + public RssStreamProvider(boolean perpetual) { + this(new ComponentConfigurator<>(RssStreamConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), perpetual); + } + + public RssStreamProvider(RssStreamConfiguration config) { + this(config, false); + } + + public RssStreamProvider(RssStreamConfiguration config, boolean perpetual) { + this.perpetual = perpetual; + this.config = config; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void startStream() { + LOGGER.trace("Starting Rss Scheduler"); + this.executor.submit(this.scheduler); + } + + @Override + public StreamsResultSet readCurrent() { + Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>(); + int batchSize = 0; + while (!this.dataQueue.isEmpty() && batchSize < MAX_SIZE) { + StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.dataQueue); + if (datum != null) { + ++batchSize; + batch.add(datum); + } + } + this.isComplete.set(this.scheduler.isComplete() && batch.isEmpty() && this.dataQueue.isEmpty()); + return new StreamsResultSet(batch); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + @Override + public boolean isRunning() { + return !this.isComplete.get(); + } + + @Override + public void prepare(Object configurationObject) { + this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + this.dataQueue = new LinkedBlockingQueue<>(); + this.scheduler = getScheduler(this.dataQueue); + this.isComplete = new AtomicBoolean(false); + int consecutiveEmptyReads = 0; + } + + @VisibleForTesting + protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) { + if (this.perpetual) { + return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue); + } else { + return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue, 0); + } + } + + @Override + public void cleanUp() { + this.scheduler.stop(); + ComponentUtils.shutdownExecutor(this.executor, 10, 10); + } + + /** + * To use from command line: + * + * <p/> + * Supply configuration similar to src/test/resources/rss.conf + * + * <p/> + * Launch using: + * + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json" + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + RssStreamConfiguration config = new ComponentConfigurator<>(RssStreamConfiguration.class).detectConfiguration(typesafe, "rss"); + RssStreamProvider provider = new RssStreamProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + for (StreamsDatum datum : provider.readCurrent()) { + String json; + try { + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); } - this.isComplete.set(this.scheduler.isComplete() && batch.isEmpty() && this.dataQueue.isEmpty()); - return new StreamsResultSet(batch); - } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } - - @Override - public boolean isRunning() { - return !this.isComplete.get(); - } - - @Override - public void prepare(Object configurationObject) { - this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - this.dataQueue = new LinkedBlockingQueue<>(); - this.scheduler = getScheduler(this.dataQueue); - this.isComplete = new AtomicBoolean(false); - int consecutiveEmptyReads = 0; - } - - @VisibleForTesting - protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) { - if(this.perpetual) - return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue); - else - return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue, 0); - } - - @Override - public void cleanUp() { - this.scheduler.stop(); - ComponentUtils.shutdownExecutor(this.executor, 10, 10); - } - - public static void main(String[] args) throws Exception { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - RssStreamConfiguration config = new ComponentConfigurator<>(RssStreamConfiguration.class).detectConfiguration(typesafe, "rss"); - RssStreamProvider provider = new RssStreamProvider(config); - - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - for (StreamsDatum datum : provider.readCurrent()) { - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException e) { - System.err.println(e.getMessage()); - } - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); + } } + while ( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java index 3800a51..03a66d1 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java @@ -18,27 +18,23 @@ package org.apache.streams.rss.provider; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.data.util.RFC3339Utils; +import org.apache.streams.rss.serializer.SyndEntrySerializer; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.sun.syndication.feed.synd.SyndEntry; import com.sun.syndication.feed.synd.SyndFeed; import com.sun.syndication.io.FeedException; import com.sun.syndication.io.SyndFeedInput; -import com.sun.syndication.io.XmlReader; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.data.util.RFC3339Utils; -import org.apache.streams.rss.FeedDetails; -import org.apache.streams.rss.serializer.SyndEntrySerializer; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStreamReader; -import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; import java.util.Collections; @@ -50,15 +46,19 @@ import java.util.concurrent.ConcurrentHashMap; /** * A {@link java.lang.Runnable} task that queues rss feed data. * + * <p/> * <code>RssStreamProviderTask</code> reads the content of an rss feed and queues the articles from * the feed inform of a {@link com.fasterxml.jackson.databind.node.ObjectNode} wrapped in a {@link org.apache.streams.core.StreamsDatum}. * The task can filter articles by a published date. If the task cannot parse the date of the article or the article does not contain a * published date, by default the task will attempt to queue article. * - * A task can be run in perpetual mode which will store the article urls in a static variable. The next time a <code>RssStreamProviderTask</code> - * is run, it will not queue data that was seen the previous time the rss feed was read. This is an attempt to reduce - * multiple copies of an article from being out put by a {@link org.apache.streams.rss.provider.RssStreamProvider}. + * <p/> + * A task can be run in perpetual mode which will store the article urls in a static variable. The next time a + * <code>RssStreamProviderTask</code> is run, it will not queue data that was seen the previous time the rss feed was read. + * This is an attempt to reduce multiple copies of an article from being output by a + * {@link org.apache.streams.rss.provider.RssStreamProvider}. * + * <p/> * ** Warning! ** * It still is possible to output multiples of the same article. If multiple tasks executions for the same rss feed overlap * in execution time, it possible that the previously seen articles static variable will not have been updated in time. @@ -66,183 +66,187 @@ import java.util.concurrent.ConcurrentHashMap; */ public class RssStreamProviderTask implements Runnable { - private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTask.class); - private static final int DEFAULT_TIME_OUT = 10000; // 10 seconds - private static final String RSS_KEY = "rssFeed"; - private static final String URI_KEY = "uri"; - private static final String LINK_KEY = "link"; - private static final String DATE_KEY = "publishedDate"; - - /** - * Map that contains the Set of previously seen articles by an rss feed. - */ - @VisibleForTesting - protected static final Map<String, Set<String>> PREVIOUSLY_SEEN = new ConcurrentHashMap<>(); - - - private BlockingQueue<StreamsDatum> dataQueue; - private String rssFeed; - private int timeOut; - private SyndEntrySerializer serializer; - private DateTime publishedSince; - private boolean perpetual; - - - /** - * Non-perpetual mode, no date filter, time out of 10 sec - * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask#RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)} - * @param queue - * @param rssFeed - */ - public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed) { - this(queue, rssFeed, new DateTime().minusYears(30), DEFAULT_TIME_OUT, false); - } - - /** - * Non-perpetual mode, no date filter - * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask#RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)} - * @param queue - * @param rssFeed - * @param timeOut - */ - public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, int timeOut) { - this(queue, rssFeed, new DateTime().minusYears(30), timeOut, false); - } - - /** - * Non-perpetual mode, time out of 10 sec - * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask#RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)} - * @param queue - * @param rssFeed - * @param publishedSince - */ - public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince) { - this(queue, rssFeed, publishedSince, DEFAULT_TIME_OUT, false); - } - - /** - * RssStreamProviderTask that reads an rss feed url and queues the resulting articles as StreamsDatums with the documents - * being object nodes. - * @param queue Queue to push data to - * @param rssFeed url of rss feed to read - * @param publishedSince DateTime to filter articles by, will queue articles with published times after this - * @param timeOut url connection timeout in milliseconds - * @param perpetual true, if you want to run in perpetual mode. NOT RECOMMENDED - */ - public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince, int timeOut, boolean perpetual) { - this.dataQueue = queue; - this.rssFeed = rssFeed; - this.timeOut = timeOut; - this.publishedSince = publishedSince; - this.serializer = new SyndEntrySerializer(); - this.perpetual = perpetual; - } - - /** - * The rss feed url that this task is responsible for reading - * @return rss feed url - */ - public String getRssFeed() { - return this.rssFeed; - } - - @Override - public void run() { - try { - Set<String> batch = queueFeedEntries(new URL(this.rssFeed)); - if(this.perpetual) - PREVIOUSLY_SEEN.put(this.getRssFeed(), batch); - } catch (IOException | FeedException e) { - LOGGER.warn("Exception while reading rss stream, {} : {}", this.rssFeed, e); - } + private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTask.class); + private static final int DEFAULT_TIME_OUT = 10000; // 10 seconds + private static final String RSS_KEY = "rssFeed"; + private static final String URI_KEY = "uri"; + private static final String LINK_KEY = "link"; + private static final String DATE_KEY = "publishedDate"; + + /** + * Map that contains the Set of previously seen articles by an rss feed. + */ + @VisibleForTesting + protected static final Map<String, Set<String>> PREVIOUSLY_SEEN = new ConcurrentHashMap<>(); + + + private BlockingQueue<StreamsDatum> dataQueue; + private String rssFeed; + private int timeOut; + private SyndEntrySerializer serializer; + private DateTime publishedSince; + private boolean perpetual; + + + /** + * Non-perpetual mode, no date filter, time out of 10 sec + * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask + * #RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)} + * @param queue queue + * @param rssFeed rssFeed + */ + public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed) { + this(queue, rssFeed, new DateTime().minusYears(30), DEFAULT_TIME_OUT, false); + } + + /** + * Non-perpetual mode, no date filter. + * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask + * #RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)} + * @param queue queue + * @param rssFeed rssFeed + * @param timeOut timeOut + */ + public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, int timeOut) { + this(queue, rssFeed, new DateTime().minusYears(30), timeOut, false); + } + + /** + * Non-perpetual mode, time out of 10 sec + * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask + * #RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)} + * @param queue queue + * @param rssFeed rssFeed + * @param publishedSince publishedSince + */ + public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince) { + this(queue, rssFeed, publishedSince, DEFAULT_TIME_OUT, false); + } + + /** + * RssStreamProviderTask that reads an rss feed url and queues the resulting articles as StreamsDatums with the documents + * being object nodes. + * @param queue Queue to push data to + * @param rssFeed url of rss feed to read + * @param publishedSince DateTime to filter articles by, will queue articles with published times after this + * @param timeOut url connection timeout in milliseconds + * @param perpetual true, if you want to run in perpetual mode. NOT RECOMMENDED + */ + public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince, int timeOut, boolean perpetual) { + this.dataQueue = queue; + this.rssFeed = rssFeed; + this.timeOut = timeOut; + this.publishedSince = publishedSince; + this.serializer = new SyndEntrySerializer(); + this.perpetual = perpetual; + } + + /** + * The rss feed url that this task is responsible for reading. + * @return rss feed url + */ + public String getRssFeed() { + return this.rssFeed; + } + + @Override + public void run() { + try { + Set<String> batch = queueFeedEntries(new URL(this.rssFeed)); + if (this.perpetual) { + PREVIOUSLY_SEEN.put(this.getRssFeed(), batch); + } + } catch (IOException | FeedException ex) { + LOGGER.warn("Exception while reading rss stream, {} : {}", this.rssFeed, ex); } - - /** - * Reads the url and queues the data - * @param feedUrl rss feed url - * @return set of all article urls that were read from the feed - * @throws IOException when it cannot connect to the url or the url is malformed - * @throws FeedException when it cannot reed the feed. - */ - @VisibleForTesting - protected Set<String> queueFeedEntries(URL feedUrl) throws IOException, FeedException { - - // ConcurrentHashSet is preferable, but it's only in guava 15+ - // spark 1.5.0 uses guava 14 so for the moment this is the workaround - Set<String> batch = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); - URLConnection connection = feedUrl.openConnection(); - connection.setConnectTimeout(this.timeOut); - connection.setConnectTimeout(this.timeOut); - SyndFeedInput input = new SyndFeedInput(); - SyndFeed feed = input.build(new InputStreamReader(connection.getInputStream())); - for (Object entryObj : feed.getEntries()) { - SyndEntry entry = (SyndEntry) entryObj; - ObjectNode nodeEntry = this.serializer.deserialize(entry); - nodeEntry.put(RSS_KEY, this.rssFeed); - String entryId = determineId(nodeEntry); - batch.add(entryId); - StreamsDatum datum = new StreamsDatum(nodeEntry); - try { - JsonNode published = nodeEntry.get(DATE_KEY); - if (published != null) { - try { - DateTime date = RFC3339Utils.parseToUTC(published.asText()); - if (date.isAfter(this.publishedSince) && (!this.perpetual || !seenBefore(entryId, this.rssFeed))) { - this.dataQueue.put(datum); - LOGGER.debug("Added entry, {}, to provider queue.", entryId); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOGGER.trace("Failed to parse date from object node, attempting to add node to queue by default."); - if(!this.perpetual || !seenBefore(entryId, this.rssFeed)) { - this.dataQueue.put(datum); - LOGGER.debug("Added entry, {}, to provider queue.", entryId); - } - } - } else { - LOGGER.debug("No published date present, attempting to add node to queue by default."); - if(!this.perpetual || !seenBefore(entryId, this.rssFeed)) { - this.dataQueue.put(datum); - LOGGER.debug("Added entry, {}, to provider queue.", entryId); - } - } - } catch (InterruptedException ie) { - LOGGER.error("Interupted Exception."); - Thread.currentThread().interrupt(); + } + + /** + * Reads the url and queues the data + * @param feedUrl rss feed url + * @return set of all article urls that were read from the feed + * @throws IOException when it cannot connect to the url or the url is malformed + * @throws FeedException when it cannot reed the feed. + */ + @VisibleForTesting + protected Set<String> queueFeedEntries(URL feedUrl) throws IOException, FeedException { + + // ConcurrentHashSet is preferable, but it's only in guava 15+ + // spark 1.5.0 uses guava 14 so for the moment this is the workaround + Set<String> batch = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + URLConnection connection = feedUrl.openConnection(); + connection.setConnectTimeout(this.timeOut); + connection.setConnectTimeout(this.timeOut); + SyndFeedInput input = new SyndFeedInput(); + SyndFeed feed = input.build(new InputStreamReader(connection.getInputStream())); + for (Object entryObj : feed.getEntries()) { + SyndEntry entry = (SyndEntry) entryObj; + ObjectNode nodeEntry = this.serializer.deserialize(entry); + nodeEntry.put(RSS_KEY, this.rssFeed); + String entryId = determineId(nodeEntry); + batch.add(entryId); + StreamsDatum datum = new StreamsDatum(nodeEntry); + try { + JsonNode published = nodeEntry.get(DATE_KEY); + if (published != null) { + try { + DateTime date = RFC3339Utils.parseToUTC(published.asText()); + if (date.isAfter(this.publishedSince) && (!this.perpetual || !seenBefore(entryId, this.rssFeed))) { + this.dataQueue.put(datum); + LOGGER.debug("Added entry, {}, to provider queue.", entryId); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (Exception ex) { + LOGGER.trace("Failed to parse date from object node, attempting to add node to queue by default."); + if (!this.perpetual || !seenBefore(entryId, this.rssFeed)) { + this.dataQueue.put(datum); + LOGGER.debug("Added entry, {}, to provider queue.", entryId); } + } + } else { + LOGGER.debug("No published date present, attempting to add node to queue by default."); + if (!this.perpetual || !seenBefore(entryId, this.rssFeed)) { + this.dataQueue.put(datum); + LOGGER.debug("Added entry, {}, to provider queue.", entryId); + } } - return batch; + } catch (InterruptedException ie) { + LOGGER.error("Interupted Exception."); + Thread.currentThread().interrupt(); + } } - - /** - * Returns a link to the article to use as the id - * @param node - * @return - */ - private String determineId(ObjectNode node) { - String id = null; - if(node.get(URI_KEY) != null && !node.get(URI_KEY).textValue().equals("")) { - id = node.get(URI_KEY).textValue(); - } else if(node.get(LINK_KEY) != null && !node.get(LINK_KEY).textValue().equals("")) { - id = node.get(LINK_KEY).textValue(); - } - return id; + return batch; + } + + /** + * Returns link to the article to use as the id. + * @param node node + * @return String + */ + private String determineId(ObjectNode node) { + String id = null; + if (node.get(URI_KEY) != null && !node.get(URI_KEY).textValue().equals("")) { + id = node.get(URI_KEY).textValue(); + } else if (node.get(LINK_KEY) != null && !node.get(LINK_KEY).textValue().equals("")) { + id = node.get(LINK_KEY).textValue(); } - - /** - * Returns false if the artile was previously seen in another task for this feed - * @param id - * @param rssFeed - * @return - */ - private boolean seenBefore(String id, String rssFeed) { - Set<String> previousBatch = PREVIOUSLY_SEEN.get(rssFeed); - if(previousBatch == null) { - return false; - } - return previousBatch.contains(id); + return id; + } + + /** + * Returns false if the artile was previously seen in another task for this feed. + * @param id id + * @param rssFeed rssFeed + * @return boolean seenBefore + */ + private boolean seenBefore(String id, String rssFeed) { + Set<String> previousBatch = PREVIOUSLY_SEEN.get(rssFeed); + if (previousBatch == null) { + return false; } + return previousBatch.contains(id); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java index 99ccbf3..e4bfd35 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java @@ -15,12 +15,14 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.rss.provider.perpetual; -import com.google.common.collect.Maps; import org.apache.streams.core.StreamsDatum; import org.apache.streams.rss.FeedDetails; import org.apache.streams.rss.provider.RssStreamProviderTask; + +import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,82 +33,92 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; /** - * + * RssFeedScheduler launches threads to collect data from rss feeds. */ public class RssFeedScheduler implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(RssFeedScheduler.class); - private static final int DEFAULT_PEROID = 10; // 1 minute + private static final Logger LOGGER = LoggerFactory.getLogger(RssFeedScheduler.class); + private static final int DEFAULT_PEROID = 10; // 1 minute - private ExecutorService service; - private List<FeedDetails> feedDetailsList; - private int peroid; - private AtomicBoolean keepRunning; - private AtomicBoolean complete; - private Map<String, Long> lastScheduled; - private BlockingQueue<StreamsDatum> dataQueue; + private ExecutorService service; + private List<FeedDetails> feedDetailsList; + private int peroid; + private AtomicBoolean keepRunning; + private AtomicBoolean complete; + private Map<String, Long> lastScheduled; + private BlockingQueue<StreamsDatum> dataQueue; - public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue) { - this(service, feedDetailsList, dataQueue, DEFAULT_PEROID); - } + public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue) { + this(service, feedDetailsList, dataQueue, DEFAULT_PEROID); + } - public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue, int peroid) { - this.service = service; - this.feedDetailsList = feedDetailsList; - this.peroid = peroid; - this.keepRunning = new AtomicBoolean(true); - this.lastScheduled = Maps.newHashMap(); - this.dataQueue = dataQueue; - this.complete = new AtomicBoolean(false); - } + /** + * RssFeedScheduler constructor. + * @param service service + * @param feedDetailsList feedDetailsList + * @param dataQueue dataQueue + * @param peroid peroid + */ + public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue, int peroid) { + this.service = service; + this.feedDetailsList = feedDetailsList; + this.peroid = peroid; + this.keepRunning = new AtomicBoolean(true); + this.lastScheduled = Maps.newHashMap(); + this.dataQueue = dataQueue; + this.complete = new AtomicBoolean(false); + } - public void stop() { - this.keepRunning.set(false); - } + public void stop() { + this.keepRunning.set(false); + } - public boolean isComplete() { - return this.complete.get(); - } + public boolean isComplete() { + return this.complete.get(); + } - @Override - public void run() { - this.complete.set(false); - try { - if(this.peroid <= 0) { - scheduleFeeds(); - } else { - while (this.keepRunning.get()) { - scheduleFeeds(); - Thread.sleep(this.peroid * 60000); - } - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } finally { - this.service = null; - LOGGER.info("{} completed scheduling of feeds.", this.getClass().getName()); - this.complete.set(true); + @Override + public void run() { + this.complete.set(false); + try { + if (this.peroid <= 0) { + scheduleFeeds(); + } else { + while (this.keepRunning.get()) { + scheduleFeeds(); + Thread.sleep(this.peroid * 60000); } + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + this.service = null; + LOGGER.info("{} completed scheduling of feeds.", this.getClass().getName()); + this.complete.set(true); } + } - public void scheduleFeeds() { - for(FeedDetails detail : this.feedDetailsList) { - Long lastTime = null; - if((lastTime = this.lastScheduled.get(detail.getUrl())) == null) { - lastTime = 0L; - } - long currentTime = System.currentTimeMillis(); - long pollInterval; - if(detail.getPollIntervalMillis() == null) { - pollInterval = 0; - } else { - pollInterval = detail.getPollIntervalMillis(); - } - if(currentTime - lastTime > pollInterval) { - this.service.execute(new RssStreamProviderTask(this.dataQueue, detail.getUrl())); - this.LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl()); - this.lastScheduled.put(detail.getUrl(), currentTime); - } - } + /** + * Schedule Feeds. + */ + public void scheduleFeeds() { + for (FeedDetails detail : this.feedDetailsList) { + Long lastTime = null; + if ((lastTime = this.lastScheduled.get(detail.getUrl())) == null) { + lastTime = 0L; + } + long currentTime = System.currentTimeMillis(); + long pollInterval; + if (detail.getPollIntervalMillis() == null) { + pollInterval = 0; + } else { + pollInterval = detail.getPollIntervalMillis(); + } + if (currentTime - lastTime > pollInterval) { + this.service.execute(new RssStreamProviderTask(this.dataQueue, detail.getUrl())); + this.LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl()); + this.lastScheduled.put(detail.getUrl(), currentTime); + } } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java index e323f27..1e3aedd 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java @@ -18,11 +18,6 @@ package org.apache.streams.rss.serializer; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; import org.apache.streams.data.ActivitySerializer; import org.apache.streams.data.util.RFC3339Utils; import org.apache.streams.jackson.StreamsJacksonMapper; @@ -30,6 +25,12 @@ import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Author; import org.apache.streams.pojo.json.Provider; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.slf4j.Logger; @@ -40,194 +41,200 @@ import java.util.List; public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> { - private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializer.class); - private boolean includeRomeExtension; + private boolean includeRomeExtension; - public SyndEntryActivitySerializer() { - this(true); - } - - public SyndEntryActivitySerializer(boolean includeRomeExtension) { - this.includeRomeExtension = includeRomeExtension; - } + public SyndEntryActivitySerializer() { + this(true); + } + public SyndEntryActivitySerializer(boolean includeRomeExtension) { + this.includeRomeExtension = includeRomeExtension; + } - @Override - public List<Activity> deserializeAll(List<ObjectNode> objectNodes) { - List<Activity> result = new LinkedList<>(); - for (ObjectNode node : objectNodes) { - result.add(deserialize(node)); - } - return result; + @Override + public List<Activity> deserializeAll(List<ObjectNode> objectNodes) { + List<Activity> result = new LinkedList<>(); + for (ObjectNode node : objectNodes) { + result.add(deserialize(node)); } - - @Override - public String serializationFormat() { - return "application/streams-provider-rss"; + return result; + } + + @Override + public String serializationFormat() { + return "application/streams-provider-rss"; + } + + @Override + public ObjectNode serialize(Activity deserialized) { + throw new UnsupportedOperationException("Cannot currently serialize to Rome"); + } + + @Override + public Activity deserialize(ObjectNode syndEntry) { + return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension); + } + + /** + * deserializeWithRomeExtension ObjectNode entry withExtension. + * @param entry ObjectNode + * @param withExtension whether to add Rome Extension + * @return Activity + */ + public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) { + Preconditions.checkNotNull(entry); + + Activity activity = new Activity(); + Provider provider = buildProvider(entry); + ActivityObject actor = buildActor(entry); + ActivityObject activityObject = buildActivityObject(entry); + + activityObject.setUrl(provider.getUrl()); + activityObject.setAuthor(actor.getAuthor()); + + activity.setUrl(provider.getUrl()); + activity.setProvider(provider); + activity.setActor(actor); + activity.setVerb("post"); + activity.setId("id:rss:post:" + activity.getUrl()); + + JsonNode published = entry.get("publishedDate"); + if (published != null) { + try { + activity.setPublished(RFC3339Utils.parseToUTC(published.textValue())); + } catch (Exception ex) { + LOGGER.warn("Failed to parse date : {}", published.textValue()); + + DateTime now = DateTime.now().withZone(DateTimeZone.UTC); + activity.setPublished(now); + } } - @Override - public ObjectNode serialize(Activity deserialized) { - throw new UnsupportedOperationException("Cannot currently serialize to Rome"); - } + activity.setUpdated(activityObject.getUpdated()); + activity.setObject(activityObject); - @Override - public Activity deserialize(ObjectNode syndEntry) { - return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension); + if (withExtension) { + activity = addRomeExtension(activity, entry); } - public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) { - Preconditions.checkNotNull(entry); - - Activity activity = new Activity(); - Provider provider = buildProvider(entry); - ActivityObject actor = buildActor(entry); - ActivityObject activityObject = buildActivityObject(entry); - - activityObject.setUrl(provider.getUrl()); - activityObject.setAuthor(actor.getAuthor()); - - activity.setUrl(provider.getUrl()); - activity.setProvider(provider); - activity.setActor(actor); - activity.setVerb("post"); - activity.setId("id:rss:post:" + activity.getUrl()); - - JsonNode published = entry.get("publishedDate"); - if (published != null) { - try { - activity.setPublished(RFC3339Utils.parseToUTC(published.textValue())); - } catch (Exception e) { - LOGGER.warn("Failed to parse date : {}", published.textValue()); - - DateTime now = DateTime.now().withZone(DateTimeZone.UTC); - activity.setPublished(now); - } - } - - activity.setUpdated(activityObject.getUpdated()); - activity.setObject(activityObject); - - if (withExtension) { - activity = addRomeExtension(activity, entry); - } - - return activity; + return activity; + } + + /** + * Given an RSS entry, extra out the author and actor information and return it + * in an actor object + * + * @param entry entry + * @return $.actor + */ + private ActivityObject buildActor(ObjectNode entry) { + ActivityObject actor = new ActivityObject(); + Author author = new Author(); + + if (entry.get("author") != null) { + author.setId(entry.get("author").textValue()); + author.setDisplayName(entry.get("author").textValue()); + + actor.setAuthor(author); + String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null; + + actor.setId("id:rss:" + uriToSet + ":" + author.getId()); + actor.setDisplayName(author.getDisplayName()); } - /** - * Given an RSS entry, extra out the author and actor information and return it - * in an actor object - * - * @param entry - * @return - */ - private ActivityObject buildActor(ObjectNode entry) { - ActivityObject actor = new ActivityObject(); - Author author = new Author(); - - if (entry.get("author") != null) { - author.setId(entry.get("author").textValue()); - author.setDisplayName(entry.get("author").textValue()); - - actor.setAuthor(author); - String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null; - - actor.setId("id:rss:" + uriToSet + ":" + author.getId()); - actor.setDisplayName(author.getDisplayName()); - } - - return actor; + return actor; + } + + /** + * Given an RSS object, build the ActivityObject. + * + * @param entry ObjectNode + * @return $.object + */ + private ActivityObject buildActivityObject(ObjectNode entry) { + ActivityObject activityObject = new ActivityObject(); + + JsonNode summary = entry.get("description"); + if (summary != null) { + activityObject.setSummary(summary.textValue()); + } else if ((summary = entry.get("title")) != null) { + activityObject.setSummary(summary.textValue()); } - /** - * Given an RSS object, build the ActivityObject - * - * @param entry - * @return - */ - private ActivityObject buildActivityObject(ObjectNode entry) { - ActivityObject activityObject = new ActivityObject(); + return activityObject; + } - JsonNode summary = entry.get("description"); - if (summary != null) - activityObject.setSummary(summary.textValue()); - else if((summary = entry.get("title")) != null) { - activityObject.setSummary(summary.textValue()); - } + /** + * Given an RSS object, build and return the Provider object. + * + * @param entry ObjectNode + * @return $.provider + */ + private Provider buildProvider(ObjectNode entry) { + Provider provider = new Provider(); - return activityObject; - } + String link = null; + String uri = null; + String resourceLocation = null; - /** - * Given an RSS object, build and return the Provider object - * - * @param entry - * @return - */ - private Provider buildProvider(ObjectNode entry) { - Provider provider = new Provider(); - - String link = null; - String uri = null; - String resourceLocation = null; - - if (entry.get("link") != null) - link = entry.get("link").textValue(); - if (entry.get("uri") != null) - uri = entry.get("uri").textValue(); - - /* - * Order of precedence for resourceLocation selection - * - * 1. Valid URI - * 2. Valid Link - * 3. Non-null URI - * 4. Non-null Link - */ - if(isValidResource(uri)) - resourceLocation = uri; - else if(isValidResource(link)) - resourceLocation = link; - else if(uri != null || link != null) { - resourceLocation = (uri != null) ? uri : link; - } - - provider.setId("id:providers:rss"); - provider.setUrl(resourceLocation); - provider.setDisplayName("RSS"); - - return provider; + if (entry.get("link") != null) { + link = entry.get("link").textValue(); } - - /** - * Tests whether or not the passed in resource is a valid URI - * @param resource - * @return boolean of whether or not the resource is valid - */ - private boolean isValidResource(String resource) { - return resource != null && (resource.startsWith("http") || resource.startsWith("www")); + if (entry.get("uri") != null) { + uri = entry.get("uri").textValue(); } - - /** - * Given an RSS object and an existing activity, - * add the Rome extension to that activity and return it + /* + * Order of precedence for resourceLocation selection * - * @param activity - * @param entry - * @return + * 1. Valid URI + * 2. Valid Link + * 3. Non-null URI + * 4. Non-null Link */ - private Activity addRomeExtension(Activity activity, ObjectNode entry) { - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class); - ObjectNode extensions = JsonNodeFactory.instance.objectNode(); - - extensions.put("rome", entry); - activityRoot.put("extensions", extensions); - - activity = mapper.convertValue(activityRoot, Activity.class); - - return activity; + if (isValidResource(uri)) { + resourceLocation = uri; + } else if (isValidResource(link)) { + resourceLocation = link; + } else if (uri != null || link != null) { + resourceLocation = (uri != null) ? uri : link; } + + provider.setId("id:providers:rss"); + provider.setUrl(resourceLocation); + provider.setDisplayName("RSS"); + + return provider; + } + + /** + * Tests whether or not the passed in resource is a valid URI. + * @param resource resource + * @return boolean of whether or not the resource is valid + */ + private boolean isValidResource(String resource) { + return resource != null && (resource.startsWith("http") || resource.startsWith("www")); + } + + /** + * Given an RSS object and an existing activity, + * add the Rome extension to that activity and return it. + * + * @param activity Activity + * @param entry ObjectNode + * @return Activity + */ + private Activity addRomeExtension(Activity activity, ObjectNode entry) { + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class); + ObjectNode extensions = JsonNodeFactory.instance.objectNode(); + + extensions.put("rome", entry); + activityRoot.put("extensions", extensions); + + activity = mapper.convertValue(activityRoot, Activity.class); + + return activity; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java index 1135172..6868bfc 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java @@ -26,7 +26,12 @@ import com.sun.syndication.feed.module.Module; import com.sun.syndication.feed.rss.Category; import com.sun.syndication.feed.rss.Content; import com.sun.syndication.feed.rss.Enclosure; -import com.sun.syndication.feed.synd.*; +import com.sun.syndication.feed.synd.SyndContent; +import com.sun.syndication.feed.synd.SyndEnclosure; +import com.sun.syndication.feed.synd.SyndEntry; +import com.sun.syndication.feed.synd.SyndFeed; +import com.sun.syndication.feed.synd.SyndImage; +import com.sun.syndication.feed.synd.SyndLinkImpl; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; import org.slf4j.Logger; @@ -42,267 +47,284 @@ import java.util.List; */ public class SyndEntrySerializer { - private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntrySerializer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntrySerializer.class); - public ObjectNode deserialize(SyndEntry entry) { - return deserializeRomeEntry(entry); - } - - - public List<ObjectNode> deserializeAll(Collection<SyndEntry> entries) { - List<ObjectNode> result = Lists.newLinkedList(); - for(SyndEntry entry : entries) { - result.add(deserialize(entry)); - } - return result; - } + public ObjectNode deserialize(SyndEntry entry) { + return deserializeRomeEntry(entry); + } + private ObjectNode deserializeRomeEntry(SyndEntry entry) { + JsonNodeFactory factory = JsonNodeFactory.instance; + ObjectNode root = factory.objectNode(); + serializeString(entry.getAuthor(), "author", root); + serializeListOfStrings(entry.getAuthors(), "authors", root, factory); + serializeCategories(root, factory, entry.getCategories()); + serializeContents(root, factory, entry.getContents()); + serializeListOfStrings(entry.getContributors(), "contributors", root, factory); + serializeDescription(root, factory, entry.getDescription()); + serializeEnclosures(root, factory, entry.getEnclosures()); + serializeForeignMarkUp(root, factory, entry.getForeignMarkup()); + serializeString(entry.getLink(), "link", root); + serializeLinks(root, factory, entry.getLinks()); + serializeModules(root, factory, entry.getModules()); + serializeDate(root, entry.getPublishedDate(), "publishedDate"); + serializeSource(root, factory, entry.getSource()); + serializeString(entry.getTitle(), "title", root); + serializeDate(root, entry.getUpdatedDate(), "updateDate"); + serializeString(entry.getUri(), "uri", root); - private ObjectNode deserializeRomeEntry(SyndEntry entry) { - JsonNodeFactory factory = JsonNodeFactory.instance; - ObjectNode root = factory.objectNode(); + return root; + } - serializeString(entry.getAuthor(), "author", root); - serializeListOfStrings(entry.getAuthors(), "authors", root, factory); - serializeCategories(root, factory, entry.getCategories()); - serializeContents(root, factory, entry.getContents()); - serializeListOfStrings(entry.getContributors(), "contributors", root, factory); - serializeDescription(root, factory, entry.getDescription()); - serializeEnclosures(root, factory, entry.getEnclosures()); - serializeForeignMarkUp(root, factory, entry.getForeignMarkup()); - serializeString(entry.getLink(), "link", root); - serializeLinks(root, factory, entry.getLinks()); - serializeModules(root, factory, entry.getModules()); - serializeDate(root, entry.getPublishedDate(), "publishedDate"); - serializeSource(root, factory, entry.getSource()); - serializeString(entry.getTitle(), "title", root); - serializeDate(root, entry.getUpdatedDate(), "updateDate"); - serializeString(entry.getUri(), "uri", root); - return root; + private void serializeCategories(ObjectNode root, JsonNodeFactory factory, List categories) { + if (categories == null || categories.size() == 0) { + return; } - - - private void serializeCategories(ObjectNode root, JsonNodeFactory factory, List categories) { - if(categories == null || categories.size() == 0) - return; - ArrayNode cats = factory.arrayNode(); - for(Object obj : categories) { - if(obj instanceof Category) { - ObjectNode catNode = factory.objectNode(); - Category category = (Category) obj; - if(category.getDomain() != null) - catNode.put("domain", category.getDomain()); - if(category.getValue() != null) - catNode.put("value", category.getValue()); - cats.add(catNode); - } - else if(obj instanceof com.sun.syndication.feed.atom.Category) { - com.sun.syndication.feed.atom.Category category = (com.sun.syndication.feed.atom.Category) obj; - ObjectNode catNode = factory.objectNode(); - if(category.getLabel() != null) - catNode.put("label", category.getLabel()); - if(category.getScheme() != null) - catNode.put("scheme", category.getScheme()); - if(category.getSchemeResolved() != null) - catNode.put("schemeResolved", category.getSchemeResolved()); - if(category.getTerm() != null ) - catNode.put("term", category.getTerm()); - cats.add(catNode); - } + ArrayNode cats = factory.arrayNode(); + for (Object obj : categories) { + if (obj instanceof Category) { + ObjectNode catNode = factory.objectNode(); + Category category = (Category) obj; + if (category.getDomain() != null) { + catNode.put("domain", category.getDomain()); + } + if (category.getValue() != null) { + catNode.put("value", category.getValue()); } - root.put("categories", cats); + cats.add(catNode); + } else if (obj instanceof com.sun.syndication.feed.atom.Category) { + com.sun.syndication.feed.atom.Category category = (com.sun.syndication.feed.atom.Category) obj; + ObjectNode catNode = factory.objectNode(); + if (category.getLabel() != null) { + catNode.put("label", category.getLabel()); + } + if (category.getScheme() != null) { + catNode.put("scheme", category.getScheme()); + } + if (category.getSchemeResolved() != null) { + catNode.put("schemeResolved", category.getSchemeResolved()); + } + if (category.getTerm() != null ) { + catNode.put("term", category.getTerm()); + } + cats.add(catNode); + } } + root.put("categories", cats); + } - private void serializeContents(ObjectNode root, JsonNodeFactory factory, List contents) { - if(contents == null || contents.size() == 0) - return; - ArrayNode contentsArray = factory.arrayNode(); - for(Object obj : contents) { - ObjectNode content = factory.objectNode(); - if(obj instanceof Content) { - Content rssContent = (Content) obj; - content.put("type", rssContent.getType()); - content.put("value", rssContent.getValue()); - } - if(obj instanceof com.sun.syndication.feed.atom.Content) { - com.sun.syndication.feed.atom.Content atomContent = (com.sun.syndication.feed.atom.Content) obj; - content.put("type", atomContent.getType()); - content.put("value", atomContent.getValue()); - content.put("mode", atomContent.getMode()); - content.put("src", atomContent.getSrc()); - } - contentsArray.add(content); - } - root.put("contents", contentsArray); + private void serializeContents(ObjectNode root, JsonNodeFactory factory, List contents) { + if (contents == null || contents.size() == 0) { + return; + } + ArrayNode contentsArray = factory.arrayNode(); + for (Object obj : contents) { + ObjectNode content = factory.objectNode(); + if (obj instanceof Content) { + Content rssContent = (Content) obj; + content.put("type", rssContent.getType()); + content.put("value", rssContent.getValue()); + } + if (obj instanceof com.sun.syndication.feed.atom.Content) { + com.sun.syndication.feed.atom.Content atomContent = (com.sun.syndication.feed.atom.Content) obj; + content.put("type", atomContent.getType()); + content.put("value", atomContent.getValue()); + content.put("mode", atomContent.getMode()); + content.put("src", atomContent.getSrc()); + } + contentsArray.add(content); } + root.put("contents", contentsArray); + } - private void serializeDate(ObjectNode root, Date date, String key) { - DateTimeFormatter formatter = ISODateTimeFormat.dateTime(); - if(date == null) - return; - root.put(key, formatter.print(date.getTime())); + private void serializeDate(ObjectNode root, Date date, String key) { + DateTimeFormatter formatter = ISODateTimeFormat.dateTime(); + if (date == null) { + return; } + root.put(key, formatter.print(date.getTime())); + } - private void serializeDescription(ObjectNode root, JsonNodeFactory factory, SyndContent synd) { - if(synd == null) - return; - ObjectNode content = factory.objectNode(); - if(synd.getValue() != null) - content.put("value", synd.getValue()); - if(synd.getMode() != null) - content.put("mode", synd.getMode()); - if(synd.getType() != null) - content.put("type", synd.getType()); - root.put("description", content); + private void serializeDescription(ObjectNode root, JsonNodeFactory factory, SyndContent synd) { + if (synd == null) { + return; + } + ObjectNode content = factory.objectNode(); + if (synd.getValue() != null) { + content.put("value", synd.getValue()); } + if (synd.getMode() != null) { + content.put("mode", synd.getMode()); + } + if (synd.getType() != null) { + content.put("type", synd.getType()); + } + root.put("description", content); + } - private void serializeEnclosures(ObjectNode root, JsonNodeFactory factory, List enclosures) { - if(enclosures == null || enclosures.size() == 0) - return; - ArrayNode encls = factory.arrayNode(); - for(Object obj : enclosures) { - if(obj instanceof Enclosure){ - Enclosure enclosure = (Enclosure) obj; - ObjectNode encl = factory.objectNode(); - if(enclosure.getType() != null) - encl.put("type", enclosure.getType()); - if(enclosure.getUrl() != null) - encl.put("url", enclosure.getUrl()); - encl.put("length", enclosure.getLength()); - encls.add(encl); - } else if(obj instanceof SyndEnclosure) { - SyndEnclosure enclosure = (SyndEnclosure) obj; - ObjectNode encl = factory.objectNode(); - if(enclosure.getType() != null) - encl.put("type", enclosure.getType()); - if(enclosure.getUrl() != null) - encl.put("url", enclosure.getUrl()); - encl.put("length", enclosure.getLength()); - encls.add(encl); - } else { - LOGGER.warn("serializeEnclosures does not handle type : {}", obj.getClass().toString()); - } + private void serializeEnclosures(ObjectNode root, JsonNodeFactory factory, List enclosures) { + if (enclosures == null || enclosures.size() == 0) { + return; + } + ArrayNode encls = factory.arrayNode(); + for (Object obj : enclosures) { + if (obj instanceof Enclosure) { + Enclosure enclosure = (Enclosure) obj; + ObjectNode encl = factory.objectNode(); + if (enclosure.getType() != null) { + encl.put("type", enclosure.getType()); + } + if (enclosure.getUrl() != null) { + encl.put("url", enclosure.getUrl()); } - root.put("enclosures", encls); + encl.put("length", enclosure.getLength()); + encls.add(encl); + } else if (obj instanceof SyndEnclosure) { + SyndEnclosure enclosure = (SyndEnclosure) obj; + ObjectNode encl = factory.objectNode(); + if (enclosure.getType() != null) { + encl.put("type", enclosure.getType()); + } + if (enclosure.getUrl() != null) { + encl.put("url", enclosure.getUrl()); + } + encl.put("length", enclosure.getLength()); + encls.add(encl); + } else { + LOGGER.warn("serializeEnclosures does not handle type : {}", obj.getClass().toString()); + } } + root.put("enclosures", encls); + } - private void serializeForeignMarkUp(ObjectNode root, JsonNodeFactory factory, Object foreignMarkUp) { - if(foreignMarkUp == null) - return; - if(foreignMarkUp instanceof String) { - root.put("foreignEnclosures", (String) foreignMarkUp); - } else if (foreignMarkUp instanceof List) { - List foreignList = (List) foreignMarkUp; - if(foreignList.size() == 0) - return; - if(foreignList.get(0) instanceof String) { - serializeListOfStrings(foreignList, "foreignEnclosures", root, factory); - } else { - LOGGER.debug("SyndEntry.getForeignMarkUp is not of type String. Need to handle the case of class : {}", ((List)foreignMarkUp).get(0).getClass().toString()); - } - } else { - LOGGER.debug("SyndEntry.getForeignMarkUp is not of an expected type. Need to handle the case of class : {}", foreignMarkUp.getClass().toString()); - } + private void serializeForeignMarkUp(ObjectNode root, JsonNodeFactory factory, Object foreignMarkUp) { + if (foreignMarkUp == null) { + return; } + if (foreignMarkUp instanceof String) { + root.put("foreignEnclosures", (String) foreignMarkUp); + } else if (foreignMarkUp instanceof List) { + List foreignList = (List) foreignMarkUp; + if (foreignList.size() == 0) { + return; + } + if (foreignList.get(0) instanceof String) { + serializeListOfStrings(foreignList, "foreignEnclosures", root, factory); + } else { + LOGGER.debug("SyndEntry.getForeignMarkUp is not of type String. Need to handle the case of class : {}", + ((List)foreignMarkUp).get(0).getClass().toString()); + } + } else { + LOGGER.debug("SyndEntry.getForeignMarkUp is not of an expected type. Need to handle the case of class : {}", + foreignMarkUp.getClass().toString()); + } + } - private void serializeImage(ObjectNode root, JsonNodeFactory factory, SyndImage image) { - if(image == null) - return; - ObjectNode imageNode = factory.objectNode(); - serializeString(image.getDescription(), "description", imageNode); - serializeString(image.getLink(), "link", imageNode); - serializeString(image.getUrl(), "url", imageNode); - serializeString(image.getTitle(), "title", imageNode); - root.put("image", imageNode); + private void serializeImage(ObjectNode root, JsonNodeFactory factory, SyndImage image) { + if (image == null) { + return; } + ObjectNode imageNode = factory.objectNode(); + serializeString(image.getDescription(), "description", imageNode); + serializeString(image.getLink(), "link", imageNode); + serializeString(image.getUrl(), "url", imageNode); + serializeString(image.getTitle(), "title", imageNode); + root.put("image", imageNode); + } - private void serializeListOfStrings(List toSerialize, String key, ObjectNode node, JsonNodeFactory factory) { - if(toSerialize == null || toSerialize.size() == 0) - return; - ArrayNode keyNode = factory.arrayNode(); - for(Object obj : toSerialize) { - if(obj instanceof String) { - keyNode.add((String) obj); - } else { - LOGGER.debug("Array at Key:{} was expecting item types of String. Received class : {}", key, obj.getClass().toString()); - } - } - node.put(key, keyNode); + private void serializeListOfStrings(List toSerialize, String key, ObjectNode node, JsonNodeFactory factory) { + if (toSerialize == null || toSerialize.size() == 0) { + return; } + ArrayNode keyNode = factory.arrayNode(); + for (Object obj : toSerialize) { + if (obj instanceof String) { + keyNode.add((String) obj); + } else { + LOGGER.debug("Array at Key:{} was expecting item types of String. Received class : {}", key, obj.getClass().toString()); + } + } + node.put(key, keyNode); + } - private void serializeLinks(ObjectNode root, JsonNodeFactory factory, List links) { - if(links == null || links.size() == 0) { - return; - } else if(links.get(0) instanceof String) { - serializeListOfStrings(links, "links", root, factory); - } else if(links.get(0) instanceof SyndLinkImpl) { - ArrayNode linksArray = factory.arrayNode(); - SyndLinkImpl syndLink; - ObjectNode linkNode; - for(Object obj : links) { - linkNode = factory.objectNode(); - syndLink = (SyndLinkImpl) obj; - linkNode.put("rel", syndLink.getRel()); - linkNode.put("href", syndLink.getHref()); - linkNode.put("type", syndLink.getType()); - linkNode.put("length", syndLink.getLength()); - linkNode.put("hrefLang", syndLink.getHreflang()); - linkNode.put("title", syndLink.getTitle()); - linksArray.add(linkNode); - } - root.put("links", linksArray); - } else { - LOGGER.error("No implementation for handling links of class : {}", links.get(0).getClass().toString()); - } + private void serializeLinks(ObjectNode root, JsonNodeFactory factory, List links) { + if (links == null || links.size() == 0) { + return; + } else if (links.get(0) instanceof String) { + serializeListOfStrings(links, "links", root, factory); + } else if (links.get(0) instanceof SyndLinkImpl) { + ArrayNode linksArray = factory.arrayNode(); + SyndLinkImpl syndLink; + ObjectNode linkNode; + for (Object obj : links) { + linkNode = factory.objectNode(); + syndLink = (SyndLinkImpl) obj; + linkNode.put("rel", syndLink.getRel()); + linkNode.put("href", syndLink.getHref()); + linkNode.put("type", syndLink.getType()); + linkNode.put("length", syndLink.getLength()); + linkNode.put("hrefLang", syndLink.getHreflang()); + linkNode.put("title", syndLink.getTitle()); + linksArray.add(linkNode); + } + root.put("links", linksArray); + } else { + LOGGER.error("No implementation for handling links of class : {}", links.get(0).getClass().toString()); } + } - private void serializeModules(ObjectNode root, JsonNodeFactory factory, List modules) { - if(modules == null || modules.size() == 0) - return; - ArrayNode modulesArray = factory.arrayNode(); - for(Object obj : modules) { - if(obj instanceof Module) { - Module mod = (Module) obj; - if(mod.getUri() != null) - modulesArray.add(mod.getUri()); - } else { - LOGGER.debug("SyndEntry.getModules() items are not of type Module. Need to handle the case of class : {}", obj.getClass().toString()); - } + private void serializeModules(ObjectNode root, JsonNodeFactory factory, List modules) { + if (modules == null || modules.size() == 0) { + return; + } + ArrayNode modulesArray = factory.arrayNode(); + for (Object obj : modules) { + if (obj instanceof Module) { + Module mod = (Module) obj; + if (mod.getUri() != null) { + modulesArray.add(mod.getUri()); } - root.put("modules", modulesArray); + } else { + LOGGER.debug("SyndEntry.getModules() items are not of type Module. Need to handle the case of class : {}", + obj.getClass().toString()); + } } + root.put("modules", modulesArray); + } - private void serializeSource(ObjectNode root, JsonNodeFactory factory, SyndFeed source) { - if(source == null) - return; - ObjectNode sourceNode = factory.objectNode(); - serializeString(source.getAuthor(), "author", sourceNode); - serializeListOfStrings(source.getAuthors(), "authors", sourceNode, factory); - serializeCategories(sourceNode, factory, source.getCategories()); - serializeString(source.getCopyright(), "copyright", sourceNode); - serializeListOfStrings(source.getContributors(), "contributors", sourceNode, factory); - serializeString(source.getDescription(), "description", sourceNode); - serializeDescription(sourceNode, factory, source.getDescriptionEx()); - // source.getEntries(); wtf? - serializeString(source.getFeedType(), "feedType", sourceNode); - serializeImage(sourceNode, factory, source.getImage()); - serializeForeignMarkUp(sourceNode, factory, source.getForeignMarkup()); - serializeString(source.getLanguage(), "language", sourceNode); - serializeString(source.getLink(), "link", sourceNode); - serializeListOfStrings(source.getLinks(), "links", sourceNode, factory); - serializeModules(sourceNode, factory, source.getModules()); - serializeDate(sourceNode, source.getPublishedDate(), "publishedDate"); - serializeString(source.getTitle(), "title", sourceNode); - serializeString(source.getUri(), "uri", sourceNode); - - root.put("source", sourceNode); + private void serializeSource(ObjectNode root, JsonNodeFactory factory, SyndFeed source) { + if (source == null) { + return; } + ObjectNode sourceNode = factory.objectNode(); + serializeString(source.getAuthor(), "author", sourceNode); + serializeListOfStrings(source.getAuthors(), "authors", sourceNode, factory); + serializeCategories(sourceNode, factory, source.getCategories()); + serializeString(source.getCopyright(), "copyright", sourceNode); + serializeListOfStrings(source.getContributors(), "contributors", sourceNode, factory); + serializeString(source.getDescription(), "description", sourceNode); + serializeDescription(sourceNode, factory, source.getDescriptionEx()); + // source.getEntries(); wtf? + serializeString(source.getFeedType(), "feedType", sourceNode); + serializeImage(sourceNode, factory, source.getImage()); + serializeForeignMarkUp(sourceNode, factory, source.getForeignMarkup()); + serializeString(source.getLanguage(), "language", sourceNode); + serializeString(source.getLink(), "link", sourceNode); + serializeListOfStrings(source.getLinks(), "links", sourceNode, factory); + serializeModules(sourceNode, factory, source.getModules()); + serializeDate(sourceNode, source.getPublishedDate(), "publishedDate"); + serializeString(source.getTitle(), "title", sourceNode); + serializeString(source.getUri(), "uri", sourceNode); + + root.put("source", sourceNode); + } - private void serializeString(String string, String key, ObjectNode node) { - if(string != null && !string.equals("")) - node.put(key, string); + private void serializeString(String string, String key, ObjectNode node) { + if (string != null && !string.equals("")) { + node.put(key, string); } + } }
