http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java index cb71c90..6722f3c 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java @@ -15,13 +15,14 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.rss.provider; import org.apache.streams.core.StreamsDatum; + import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.net.URL; @@ -37,111 +38,113 @@ import static org.junit.Assert.assertTrue; */ public class RssStreamProviderTaskIT { - - @Before - public void clearPreviouslySeen() { - //some test runners run in parallel so needs to be synchronized - //if tests are run in parallel test will have undetermined results. - synchronized (RssStreamProviderTask.PREVIOUSLY_SEEN) { - RssStreamProviderTask.PREVIOUSLY_SEEN.clear(); - } + /** + * clearPreviouslySeen. + */ + @Before + public void clearPreviouslySeen() { + //some test runners run in parallel so needs to be synchronized + //if tests are run in parallel test will have undetermined results. + synchronized (RssStreamProviderTask.PREVIOUSLY_SEEN) { + RssStreamProviderTask.PREVIOUSLY_SEEN.clear(); } + } - /** - * Test that a task can read a valid rss from a url and queue the data - * @throws Exception - */ - @Test - public void testNonPerpetualNoTimeFramePull() throws Exception { - com.healthmarketscience.common.util.resource.Handler.init(); - BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); - RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url"); - Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); - assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size()); - RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); - //Test that it will out previously seen articles - queue.clear(); - batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); - assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size()); - } + /** + * Test that a task can read a valid rss from a url and queue the data. + * @throws Exception Exception + */ + @Test + public void testNonPerpetualNoTimeFramePull() throws Exception { + com.healthmarketscience.common.util.resource.Handler.init(); + BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); + RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url"); + Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); + assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size()); + RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); + //Test that it will out previously seen articles + queue.clear(); + batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); + assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size()); + } - /** - * Test that perpetual streams will not output previously seen articles - * @throws Exception - */ - @Test - public void testPerpetualNoTimeFramePull() throws Exception { - com.healthmarketscience.common.util.resource.Handler.init(); - BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); - RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(5), 10000, true); - Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); - assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size()); - RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); - //Test that it will not out previously seen articles - queue.clear(); - batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); - assertEquals("Expected queue size to be 0", 0, queue.size()); - assertEquals("Expected batch size to be 20", 20, batch.size()); - RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); - //Test that not seen urls aren't blocked. - queue.clear(); - batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist2.xml")); - assertEquals(batch.size(), queue.size()); - assertEquals("Expected queue size to be 25", 25, queue.size()); - assertEquals("Expected batch size to be 25", 25, batch.size()); - } + /** + * Test that perpetual streams will not output previously seen articles. + * @throws Exception Exception + */ + @Test + public void testPerpetualNoTimeFramePull() throws Exception { + com.healthmarketscience.common.util.resource.Handler.init(); + BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); + RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(5), 10000, true); + Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); + assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size()); + RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); + //Test that it will not out previously seen articles + queue.clear(); + batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); + assertEquals("Expected queue size to be 0", 0, queue.size()); + assertEquals("Expected batch size to be 20", 20, batch.size()); + RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); + //Test that not seen urls aren't blocked. + queue.clear(); + batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist2.xml")); + assertEquals(batch.size(), queue.size()); + assertEquals("Expected queue size to be 25", 25, queue.size()); + assertEquals("Expected batch size to be 25", 25, batch.size()); + } - /** - * Test that you can task will only output aritcles after a certain published time - * @throws Exception - */ - @Test - public void testNonPerpetualTimeFramedPull() throws Exception{ - com.healthmarketscience.common.util.resource.Handler.init(); - BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); - DateTime publishedSince = new DateTime().withYear(2014).withDayOfMonth(5).withMonthOfYear(9).withZone(DateTimeZone.UTC); - RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", publishedSince, 10000, false); - Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); - assertEquals( 15, queue.size()); - assertEquals( 20 , batch.size()); - assertTrue( queue.size() < batch.size()); - RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); - //Test that it will out previously seen articles - queue.clear(); - batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); - assertEquals( 15, queue.size()); - assertEquals( 20 , batch.size()); - assertTrue( queue.size() < batch.size()); - } + /** + * Test that you can task will only output aritcles after a certain published time. + * @throws Exception Exception + */ + @Test + public void testNonPerpetualTimeFramedPull() throws Exception { + com.healthmarketscience.common.util.resource.Handler.init(); + BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); + DateTime publishedSince = new DateTime().withYear(2014).withDayOfMonth(5).withMonthOfYear(9).withZone(DateTimeZone.UTC); + RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", publishedSince, 10000, false); + Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); + assertEquals( 15, queue.size()); + assertEquals( 20 , batch.size()); + assertTrue( queue.size() < batch.size()); + RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); + //Test that it will out previously seen articles + queue.clear(); + batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); + assertEquals( 15, queue.size()); + assertEquals( 20 , batch.size()); + assertTrue( queue.size() < batch.size()); + } - /** - * Test that task will only output articles after a certain published time that it has not seen before. - * @throws Exception - */ - @Test - public void testPerpetualTimeFramedPull() throws Exception { - com.healthmarketscience.common.util.resource.Handler.init(); - BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); - DateTime publishedSince = new DateTime().withYear(2014).withDayOfMonth(5).withMonthOfYear(9).withZone(DateTimeZone.UTC); - RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", publishedSince, 10000, true); - Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); - assertEquals( 15, queue.size()); - assertEquals( 20 , batch.size()); - assertTrue( queue.size() < batch.size()); - RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); - //Test that it will not out put previously seen articles - queue.clear(); - batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); - assertEquals( 0, queue.size()); - assertEquals( 20 , batch.size()); - assertTrue( queue.size() < batch.size()); - RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); - - batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist2.xml")); - assertTrue( queue.size() < batch.size()); - assertEquals("Expected queue size to be 0", 3, queue.size()); - assertEquals("Expected batch size to be 0", 25, batch.size()); - } + /** + * Test that task will only output articles after a certain published time that it has not seen before. + * @throws Exception Exception + */ + @Test + public void testPerpetualTimeFramedPull() throws Exception { + com.healthmarketscience.common.util.resource.Handler.init(); + BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); + DateTime publishedSince = new DateTime().withYear(2014).withDayOfMonth(5).withMonthOfYear(9).withZone(DateTimeZone.UTC); + RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", publishedSince, 10000, true); + Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); + assertEquals( 15, queue.size()); + assertEquals( 20 , batch.size()); + assertTrue( queue.size() < batch.size()); + RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); + //Test that it will not out put previously seen articles + queue.clear(); + batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); + assertEquals( 0, queue.size()); + assertEquals( 20 , batch.size()); + assertTrue( queue.size() < batch.size()); + RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); + + batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist2.xml")); + assertTrue( queue.size() < batch.size()); + assertEquals("Expected queue size to be 0", 3, queue.size()); + assertEquals("Expected batch size to be 0", 25, batch.size()); + }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java index 60b8e0f..08a58d3 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java @@ -18,12 +18,13 @@ package org.apache.streams.rss.provider; -import com.carrotsearch.randomizedtesting.RandomizedTest; -import com.google.common.collect.Queues; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.rss.RssStreamConfiguration; import org.apache.streams.rss.provider.perpetual.RssFeedScheduler; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.google.common.collect.Queues; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,85 +37,87 @@ import java.util.concurrent.CountDownLatch; */ public class RssStreamProviderTest extends RandomizedTest { - private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTest.class); - - @Test - public void testRssFeedShutdownsNonPerpetual() throws Exception { - RssStreamProvider provider = null; - try { - final CountDownLatch latch = new CountDownLatch(1); - BlockingQueue<StreamsDatum> datums = Queues.newLinkedBlockingQueue(); - provider = new RssStreamProvider(new RssStreamConfiguration()) { - @Override - protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) { - return new MockScheduler(latch, queue); - } - }; - provider.prepare(null); - int datumCount = 0; - provider.startStream(); - while (!provider.scheduler.isComplete()) { - StreamsResultSet batch = provider.readCurrent(); - LOGGER.debug("Batch size : {}", batch.size()); - datumCount += batch.size(); - Thread.sleep(randomIntBetween(0, 3000)); - } - latch.await(); - - //one last pull incase of race condition - StreamsResultSet batch = provider.readCurrent(); - LOGGER.debug("Batch size : {}", batch.size()); - datumCount += batch.size(); - if(batch.size() != 0) { //if race condition happened, pull again - batch = provider.readCurrent(); - assertEquals(0, batch.size()); - } - - assertTrue(provider.scheduler.isComplete()); - assertEquals(20, datumCount); - assertFalse(provider.isRunning()); - assertEquals(0, datums.size()); - provider.cleanUp(); - } finally { - if(provider != null) - provider.cleanUp(); + private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTest.class); + + @Test + public void testRssFeedShutdownsNonPerpetual() throws Exception { + RssStreamProvider provider = null; + try { + final CountDownLatch latch = new CountDownLatch(1); + BlockingQueue<StreamsDatum> datums = Queues.newLinkedBlockingQueue(); + provider = new RssStreamProvider(new RssStreamConfiguration()) { + @Override + protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) { + return new MockScheduler(latch, queue); } + }; + provider.prepare(null); + int datumCount = 0; + provider.startStream(); + while (!provider.scheduler.isComplete()) { + StreamsResultSet batch = provider.readCurrent(); + LOGGER.debug("Batch size : {}", batch.size()); + datumCount += batch.size(); + Thread.sleep(randomIntBetween(0, 3000)); + } + latch.await(); + + //one last pull incase of race condition + StreamsResultSet batch = provider.readCurrent(); + LOGGER.debug("Batch size : {}", batch.size()); + datumCount += batch.size(); + if (batch.size() != 0) { + //if race condition happened, pull again + batch = provider.readCurrent(); + assertEquals(0, batch.size()); + } + + assertTrue(provider.scheduler.isComplete()); + assertEquals(20, datumCount); + assertFalse(provider.isRunning()); + assertEquals(0, datums.size()); + provider.cleanUp(); + } finally { + if (provider != null) { + provider.cleanUp(); + } } + } - private class MockScheduler extends RssFeedScheduler { + private class MockScheduler extends RssFeedScheduler { - private BlockingQueue<StreamsDatum> queue; - private CountDownLatch latch; - private volatile boolean complete = false; + private BlockingQueue<StreamsDatum> queue; + private CountDownLatch latch; + private volatile boolean complete = false; - public MockScheduler(CountDownLatch latch, BlockingQueue<StreamsDatum> dataQueue) { - super(null, null, dataQueue); - this.latch = latch; - this.queue = dataQueue; - } + public MockScheduler(CountDownLatch latch, BlockingQueue<StreamsDatum> dataQueue) { + super(null, null, dataQueue); + this.latch = latch; + this.queue = dataQueue; + } - @Override - public void run() { - try { - for (int i = 0; i < 20; ++i) { - this.queue.put(new StreamsDatum(null)); - Thread.sleep(randomIntBetween(0, 5000)); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } finally { - this.complete = true; - this.latch.countDown(); - } + @Override + public void run() { + try { + for (int i = 0; i < 20; ++i) { + this.queue.put(new StreamsDatum(null)); + Thread.sleep(randomIntBetween(0, 5000)); } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } finally { + this.complete = true; + this.latch.countDown(); + } + } - @Override - public boolean isComplete() { - return this.complete; - } + @Override + public boolean isComplete() { + return this.complete; } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java index 2bd0b69..830f0e7 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java @@ -15,12 +15,14 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.rss.provider.perpetual; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.rss.FeedDetails; import org.apache.streams.rss.provider.RssStreamProviderTask; + +import com.google.common.collect.Lists; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -34,7 +36,6 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Unit tests for {@link org.apache.streams.rss.provider.perpetual.RssFeedScheduler} @@ -42,60 +43,60 @@ import static org.mockito.Mockito.when; public class RssFeedSchedulerTest { - /** - * Test that feeds are scheduled based on elapsed time correctly. - * Takes 1 minute to run. - */ - @Test - public void testScheduleFeeds() { - ExecutorService mockService = mock(ExecutorService.class); - final List<String> queuedTasks = new ArrayList<>(5); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - queuedTasks.add(((RssStreamProviderTask) invocationOnMock.getArguments()[0]).getRssFeed()); - return null; - } - }).when(mockService).execute(any(Runnable.class)); + /** + * Test that feeds are scheduled based on elapsed time correctly. + * Takes 1 minute to run. + */ + @Test + public void testScheduleFeeds() { + ExecutorService mockService = mock(ExecutorService.class); + final List<String> queuedTasks = new ArrayList<>(5); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + queuedTasks.add(((RssStreamProviderTask) invocationOnMock.getArguments()[0]).getRssFeed()); + return null; + } + }).when(mockService).execute(any(Runnable.class)); - RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<StreamsDatum>(), 1); - scheduler.scheduleFeeds(); - assertEquals("Expected 2 Feeds to be scheduled", 2, queuedTasks.size()); - assertEquals("Expected Feed 1 to be queued first", "1", queuedTasks.get(0)); - assertEquals("Expected Feed 2 to be queued second", "2", queuedTasks.get(1)); + RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<StreamsDatum>(), 1); + scheduler.scheduleFeeds(); + assertEquals("Expected 2 Feeds to be scheduled", 2, queuedTasks.size()); + assertEquals("Expected Feed 1 to be queued first", "1", queuedTasks.get(0)); + assertEquals("Expected Feed 2 to be queued second", "2", queuedTasks.get(1)); - safeSleep(1); - scheduler.scheduleFeeds(); - assertEquals("Only feed 1 should have been re-queued", 3, queuedTasks.size()); - assertEquals("Only feed 1 should have been re-queued", "1", queuedTasks.get(2)); + safeSleep(1); + scheduler.scheduleFeeds(); + assertEquals("Only feed 1 should have been re-queued", 3, queuedTasks.size()); + assertEquals("Only feed 1 should have been re-queued", "1", queuedTasks.get(2)); - safeSleep(60 * 1000); - scheduler.scheduleFeeds(); - assertEquals("Both feeds should have been re-queued", 5, queuedTasks.size()); - assertEquals("1", queuedTasks.get(3)); - assertEquals("2", queuedTasks.get(4)); - } + safeSleep(60 * 1000); + scheduler.scheduleFeeds(); + assertEquals("Both feeds should have been re-queued", 5, queuedTasks.size()); + assertEquals("1", queuedTasks.get(3)); + assertEquals("2", queuedTasks.get(4)); + } - private List<FeedDetails> createFeedList() { - List<FeedDetails> list = Lists.newLinkedList(); - FeedDetails fd = new FeedDetails(); - fd.setPollIntervalMillis(1L); - fd.setUrl("1"); - list.add(fd); + private List<FeedDetails> createFeedList() { + List<FeedDetails> list = Lists.newLinkedList(); + FeedDetails fd = new FeedDetails(); + fd.setPollIntervalMillis(1L); + fd.setUrl("1"); + list.add(fd); - fd = new FeedDetails(); - fd.setPollIntervalMillis( 60L * 1000); - fd.setUrl("2"); - list.add(fd); - return list; - } + fd = new FeedDetails(); + fd.setPollIntervalMillis( 60L * 1000); + fd.setUrl("2"); + list.add(fd); + return list; + } - private void safeSleep(long milliseconds) { - try { - Thread.sleep(milliseconds); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + private void safeSleep(long milliseconds) { + try { + Thread.sleep(milliseconds); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java index ccac8aa..ccd8b74 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java @@ -18,17 +18,16 @@ package org.apache.streams.rss.test; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.rss.FeedDetails; +import org.apache.streams.rss.RssStreamConfiguration; +import org.apache.streams.rss.provider.RssStreamProvider; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.rss.FeedDetails; -import org.apache.streams.rss.RssStreamConfiguration; -import org.apache.streams.rss.provider.RssStreamProvider; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -53,69 +52,68 @@ import static org.hamcrest.number.OrderingComparison.greaterThan; */ public class RssStreamProviderIT { - private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderIT.class); - - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - @Test - public void testRssStreamProvider() throws Exception { - - String configfile = "./target/test-classes/RssStreamProviderIT.conf"; - String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt"; - - InputStream is = RssStreamProviderIT.class.getResourceAsStream("/top100.txt"); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - - RssStreamConfiguration configuration = new RssStreamConfiguration(); - List<FeedDetails> feedArray = Lists.newArrayList(); - try { - while (br.ready()) { - String line = br.readLine(); - if(!StringUtils.isEmpty(line)) - { - feedArray.add(new FeedDetails().withUrl(line).withPollIntervalMillis(5000l)); - } - } - configuration.setFeeds(feedArray); - } catch( Exception e ) { - System.out.println(e); - e.printStackTrace(); - Assert.fail(); + private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderIT.class); + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + @Test + public void testRssStreamProvider() throws Exception { + + final String configfile = "./target/test-classes/RssStreamProviderIT.conf"; + final String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt"; + + InputStream is = RssStreamProviderIT.class.getResourceAsStream("/top100.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + + RssStreamConfiguration configuration = new RssStreamConfiguration(); + List<FeedDetails> feedArray = Lists.newArrayList(); + try { + while (br.ready()) { + String line = br.readLine(); + if (!StringUtils.isEmpty(line)) { + feedArray.add(new FeedDetails().withUrl(line).withPollIntervalMillis(5000L)); } + } + configuration.setFeeds(feedArray); + } catch ( Exception ex ) { + System.out.println(ex); + ex.printStackTrace(); + Assert.fail(); + } - Assert.assertThat(configuration.getFeeds().size(), greaterThan(70)); + Assert.assertThat(configuration.getFeeds().size(), greaterThan(70)); - OutputStream os = new FileOutputStream(configfile); - OutputStreamWriter osw = new OutputStreamWriter(os); - BufferedWriter bw = new BufferedWriter(osw); + OutputStream os = new FileOutputStream(configfile); + OutputStreamWriter osw = new OutputStreamWriter(os); + BufferedWriter bw = new BufferedWriter(osw); - // write conf - ObjectNode feedsNode = mapper.convertValue(configuration, ObjectNode.class); - JsonNode configNode = mapper.createObjectNode().set("rss", feedsNode); + // write conf + ObjectNode feedsNode = mapper.convertValue(configuration, ObjectNode.class); + JsonNode configNode = mapper.createObjectNode().set("rss", feedsNode); - bw.write(mapper.writeValueAsString(configNode)); - bw.flush(); - bw.close(); + bw.write(mapper.writeValueAsString(configNode)); + bw.flush(); + bw.close(); - File config = new File(configfile); - assert (config.exists()); - assert (config.canRead()); - assert (config.isFile()); + File config = new File(configfile); + assert (config.exists()); + assert (config.canRead()); + assert (config.isFile()); - RssStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + RssStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() >= 200); + assert (outCounter.getLineNumber() >= 200); - } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java index 9def7ac..37833c5 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java @@ -18,17 +18,18 @@ package org.apache.streams.rss.test; -import org.apache.commons.lang3.SerializationUtils; import org.apache.streams.rss.processor.RssTypeConverter; + +import org.apache.commons.lang3.SerializationUtils; import org.junit.Test; /** * Tests Serializability of {@link org.apache.streams.rss.processor.RssTypeConverter} */ public class RssTypeConverterTest { - @Test - public void testSerializability() { - RssTypeConverter converter = new RssTypeConverter(); - RssTypeConverter clone = SerializationUtils.clone(converter); - } + @Test + public void testSerializability() { + RssTypeConverter converter = new RssTypeConverter(); + RssTypeConverter clone = SerializationUtils.clone(converter); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java index b1d5f9d..01f1999 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java @@ -18,15 +18,16 @@ package org.apache.streams.rss.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Author; import org.apache.streams.pojo.json.Provider; import org.apache.streams.rss.serializer.SyndEntryActivitySerializer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Test; @@ -37,88 +38,94 @@ import java.net.URL; import java.util.List; import java.util.Scanner; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * Tests ability to convert SyndEntry ObjectNode form to {@link org.apache.streams.rss.processor.RssTypeConverter} form */ public class SyndEntryActivitySerializerIT { - private final static Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializerIT.class); - - private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - @Test - public void testJsonData() throws Exception { - Scanner scanner = new Scanner(this.getClass().getResourceAsStream("/TestSyndEntryJson.txt")); - List<Activity> activities = Lists.newLinkedList(); - List<ObjectNode> objects = Lists.newLinkedList(); - - SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer(); + private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializerIT.class); - while(scanner.hasNext()) { - String line = scanner.nextLine(); - LOGGER.debug(line); - ObjectNode node = (ObjectNode) mapper.readTree(line); + private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - objects.add(node); - activities.add(serializer.deserialize(node)); - } + @Test + public void testJsonData() throws Exception { + Scanner scanner = new Scanner(this.getClass().getResourceAsStream("/TestSyndEntryJson.txt")); + List<Activity> activities = Lists.newLinkedList(); + List<ObjectNode> objects = Lists.newLinkedList(); - assertEquals(11, activities.size()); + SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer(); - for(int x = 0; x < activities.size(); x ++) { - ObjectNode n = objects.get(x); - Activity a = activities.get(x); + while (scanner.hasNext()) { + String line = scanner.nextLine(); + LOGGER.debug(line); + ObjectNode node = (ObjectNode) mapper.readTree(line); - testActor(n.get("author").asText(), a.getActor()); - testAuthor(n.get("author").asText(), a.getObject().getAuthor()); - testProvider("id:providers:rss", "RSS", a.getProvider()); - testProviderUrl(a.getProvider()); - testVerb("post", a.getVerb()); - testPublished(n.get("publishedDate").asText(), a.getPublished()); - testUrl(n.get("uri").asText(), n.get("link").asText(), a); - } + objects.add(node); + activities.add(serializer.deserialize(node)); } - public void testVerb(String expected, String verb) { - assertEquals(expected, verb); - } + assertEquals(11, activities.size()); - public void testPublished(String expected, DateTime published) { - assertEquals(new DateTime(expected, DateTimeZone.UTC), published); - } + for (int x = 0; x < activities.size(); x++) { + ObjectNode objectNode = objects.get(x); + Activity activity = activities.get(x); - public void testActor(String expected, ActivityObject actor) { - assertEquals("id:rss:null" + ":" + expected, actor.getId()); - assertEquals(expected, actor.getDisplayName()); + testActor(objectNode.get("author").asText(), activity.getActor()); + testAuthor(objectNode.get("author").asText(), activity.getObject().getAuthor()); + testProvider("id:providers:rss", "RSS", activity.getProvider()); + validateProviderUrl(activity.getProvider()); + testVerb("post", activity.getVerb()); + testPublished(objectNode.get("publishedDate").asText(), activity.getPublished()); + testUrl(objectNode.get("uri").asText(), objectNode.get("link").asText(), activity); } - - public void testAuthor(String expected, Author author) { - assertEquals(expected, author.getDisplayName()); - assertEquals(expected, author.getId()); - } - - public void testProvider(String expectedId, String expectedDisplay, Provider provider) { - assertEquals(expectedId, provider.getId()); - assertEquals(expectedDisplay, provider.getDisplayName()); + } + + public void testVerb(String expected, String verb) { + assertEquals(expected, verb); + } + + public void testPublished(String expected, DateTime published) { + assertEquals(new DateTime(expected, DateTimeZone.UTC), published); + } + + public void testActor(String expected, ActivityObject actor) { + assertEquals("id:rss:null" + ":" + expected, actor.getId()); + assertEquals(expected, actor.getDisplayName()); + } + + public void testAuthor(String expected, Author author) { + assertEquals(expected, author.getDisplayName()); + assertEquals(expected, author.getId()); + } + + public void testProvider(String expectedId, String expectedDisplay, Provider provider) { + assertEquals(expectedId, provider.getId()); + assertEquals(expectedDisplay, provider.getDisplayName()); + } + + /** + * validate Provider Url. + * @param provider Provider + */ + public void validateProviderUrl(Provider provider) { + URL url = null; + + try { + url = new URL(provider.getUrl()); + url.toURI(); + } catch (Exception ex) { + LOGGER.error("Threw an exception while trying to validate URL: {} - {}", provider.getUrl(), ex); } - public void testProviderUrl(Provider provider) { - URL url = null; + assertNotNull(url); + } - try { - url = new URL(provider.getUrl()); - url.toURI(); - } catch(Exception e) { - LOGGER.error("Threw an exception while trying to validate URL: {} - {}", provider.getUrl(), e); - } - - assertNotNull(url); - } - - public void testUrl(String expectedURI, String expectedLink, Activity activity) { - assertTrue((expectedURI == activity.getUrl() || expectedLink == activity.getUrl())); - assertTrue((expectedURI == activity.getObject().getUrl() || expectedLink == activity.getObject().getUrl())); - } + public void testUrl(String expectedUri, String expectedLink, Activity activity) { + assertTrue((expectedUri == activity.getUrl() || expectedLink == activity.getUrl())); + assertTrue((expectedUri == activity.getObject().getUrl() || expectedLink == activity.getObject().getUrl())); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosException.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosException.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosException.java index a38e267..1480308 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosException.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosException.java @@ -20,44 +20,44 @@ package org.apache.streams.sysomos; /** - * Runtime exception wrapper for Sysomos Errors + * Runtime exception wrapper for Sysomos Errors. */ public class SysomosException extends RuntimeException { - private int errorCode = -1; + private int errorCode = -1; - public SysomosException() { - // TODO Auto-generated constructor stub - } + public SysomosException() { + // TODO Auto-generated constructor stub + } - public SysomosException(String arg0) { - super(arg0); - // TODO Auto-generated constructor stub - } + public SysomosException(String arg0) { + super(arg0); + // TODO Auto-generated constructor stub + } - public SysomosException(Throwable arg0) { - super(arg0); - // TODO Auto-generated constructor stub - } + public SysomosException(Throwable arg0) { + super(arg0); + // TODO Auto-generated constructor stub + } - public SysomosException(String arg0, Throwable arg1) { - super(arg0, arg1); - // TODO Auto-generated constructor stub - } + public SysomosException(String arg0, Throwable arg1) { + super(arg0, arg1); + // TODO Auto-generated constructor stub + } - public SysomosException(String arg0, int errorCode) { - super(arg0); - this.errorCode = errorCode; - } + public SysomosException(String arg0, int errorCode) { + super(arg0); + this.errorCode = errorCode; + } - public SysomosException(String arg0, Throwable arg1, int errorCode) { - super(arg0, arg1); - this.errorCode = errorCode; - } + public SysomosException(String arg0, Throwable arg1, int errorCode) { + super(arg0, arg1); + this.errorCode = errorCode; + } - public int getErrorCode() { - return this.errorCode; - } + public int getErrorCode() { + return this.errorCode; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java index 5d2a399..c0278cd 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java @@ -19,12 +19,13 @@ package org.apache.streams.sysomos.conversion; -import com.sysomos.xml.BeatApi; -import org.apache.commons.lang.StringUtils; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Provider; + +import com.sysomos.xml.BeatApi; +import org.apache.commons.lang.StringUtils; import org.joda.time.DateTime; import java.util.HashMap; @@ -42,103 +43,109 @@ import static org.apache.streams.data.util.ActivityUtil.getProviderId; */ public class SysomosBeatActivityConverter { - private static final String LANGUAGE_KEY = "LANGUAGE"; - - public Activity convert(BeatApi.BeatResponse.Beat beat) { - Activity converted = new Activity(); - converted.setId(beat.getDocid()); - converted.setVerb("post"); - converted.setContent(beat.getContent()); - converted.setTitle(beat.getTitle()); - converted.setPublished(new DateTime(beat.getTime())); - converted.setUrl(beat.getLink()); - converted.setActor(new ActivityObject()); - Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags = mapTags(beat); - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(converted); - extensions.put("keywords", beat.getContent()); - setLocation(beat, extensions); - setObject(beat, converted); - setProvider(beat, converted); - setLanguage(mappedTags, extensions); - extensions.put("sysomos", beat); - - setChannelSpecificValues(beat, converted, mappedTags); - - return converted; - } - - protected void setChannelSpecificValues(BeatApi.BeatResponse.Beat beat, Activity converted, Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags) { - String mediaType = beat.getMediaType(); - String lowerMediaType = mediaType.toLowerCase(); - ActivityObject actor = converted.getActor(); - ActivityObject object = converted.getObject(); - if ("TWITTER".equals(mediaType)) { - actor.setId(getPersonId(lowerMediaType, beat.getHost())); - actor.setDisplayName(beat.getHost()); - actor.setUrl("http://twitter.com/" + beat.getHost()); - object.setObjectType("tweet"); - object.setId(getObjectId(lowerMediaType, "tweet", beat.getTweetid())); - } else if ("FACEBOOK".equals(mediaType)) { - String fbid = mappedTags.containsKey("FBID") ? mappedTags.get("FBID").getValue() : ""; - actor.setId(getPersonId(lowerMediaType, fbid)); - actor.setDisplayName(beat.getTitle()); - actor.setUrl(beat.getHost()); - object.setObjectType("post"); - object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode()))); - } else { - actor.setId(null); - actor.setDisplayName(null); - actor.setUrl(null); - object.setObjectType("post"); - object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode()))); - } + private static final String LANGUAGE_KEY = "LANGUAGE"; + + /** + * convert BeatApi.BeatResponse.Beat to Activity + * @param beat BeatApi.BeatResponse.Beat + * @return Activity + */ + public Activity convert(BeatApi.BeatResponse.Beat beat) { + Activity converted = new Activity(); + converted.setId(beat.getDocid()); + converted.setVerb("post"); + converted.setContent(beat.getContent()); + converted.setTitle(beat.getTitle()); + converted.setPublished(new DateTime(beat.getTime())); + converted.setUrl(beat.getLink()); + converted.setActor(new ActivityObject()); + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(converted); + extensions.put("keywords", beat.getContent()); + setLocation(beat, extensions); + setObject(beat, converted); + setProvider(beat, converted); + Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags = mapTags(beat); + setLanguage(mappedTags, extensions); + extensions.put("sysomos", beat); + + setChannelSpecificValues(beat, converted, mappedTags); + + return converted; + } + + protected void setChannelSpecificValues( + BeatApi.BeatResponse.Beat beat, + Activity converted, Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags) { + String mediaType = beat.getMediaType(); + String lowerMediaType = mediaType.toLowerCase(); + ActivityObject actor = converted.getActor(); + ActivityObject object = converted.getObject(); + if ("TWITTER".equals(mediaType)) { + actor.setId(getPersonId(lowerMediaType, beat.getHost())); + actor.setDisplayName(beat.getHost()); + actor.setUrl("http://twitter.com/" + beat.getHost()); + object.setObjectType("tweet"); + object.setId(getObjectId(lowerMediaType, "tweet", beat.getTweetid())); + } else if ("FACEBOOK".equals(mediaType)) { + String fbid = mappedTags.containsKey("FBID") ? mappedTags.get("FBID").getValue() : ""; + actor.setId(getPersonId(lowerMediaType, fbid)); + actor.setDisplayName(beat.getTitle()); + actor.setUrl(beat.getHost()); + object.setObjectType("post"); + object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode()))); + } else { + actor.setId(null); + actor.setDisplayName(null); + actor.setUrl(null); + object.setObjectType("post"); + object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode()))); } + } - protected void setLanguage(Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags, Map<String, Object> extensions) { - if(mappedTags.containsKey(LANGUAGE_KEY)) { - extensions.put(LANGUAGE_EXTENSION, mappedTags.get(LANGUAGE_KEY).getValue()); - } + protected void setLanguage(Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags, Map<String, Object> extensions) { + if (mappedTags.containsKey(LANGUAGE_KEY)) { + extensions.put(LANGUAGE_EXTENSION, mappedTags.get(LANGUAGE_KEY).getValue()); } - - protected void setObject(BeatApi.BeatResponse.Beat beat, Activity converted) { - ActivityObject object = new ActivityObject(); - converted.setObject(object); - object.setUrl(beat.getLink()); - object.setContent(beat.getContent()); - } - - @SuppressWarnings("unchecked") - protected void setLocation(BeatApi.BeatResponse.Beat beat, Map<String, Object> extensions) { - Map<String, Object> location; - String country = beat.getLocation().getCountry(); - if(StringUtils.isNotBlank(country)) { - if (extensions.containsKey(LOCATION_EXTENSION)) { - location = (Map<String, Object>) extensions.get(LOCATION_EXTENSION); - } else { - location = new HashMap<>(); - extensions.put(LOCATION_EXTENSION, location); - } - location.put(LOCATION_EXTENSION_COUNTRY, country); - } + } + + protected void setObject(BeatApi.BeatResponse.Beat beat, Activity converted) { + ActivityObject object = new ActivityObject(); + converted.setObject(object); + object.setUrl(beat.getLink()); + object.setContent(beat.getContent()); + } + + @SuppressWarnings("unchecked") + protected void setLocation(BeatApi.BeatResponse.Beat beat, Map<String, Object> extensions) { + Map<String, Object> location; + String country = beat.getLocation().getCountry(); + if (StringUtils.isNotBlank(country)) { + if (extensions.containsKey(LOCATION_EXTENSION)) { + location = (Map<String, Object>) extensions.get(LOCATION_EXTENSION); + } else { + location = new HashMap<>(); + extensions.put(LOCATION_EXTENSION, location); + } + location.put(LOCATION_EXTENSION_COUNTRY, country); } - - protected void setProvider(BeatApi.BeatResponse.Beat beat, Activity converted) { - Provider provider = new Provider(); - String mediaType = beat.getMediaType().toLowerCase(); - provider.setId(getProviderId(mediaType)); - provider.setDisplayName(StringUtils.capitalize(mediaType)); - converted.setProvider(provider); + } + + protected void setProvider(BeatApi.BeatResponse.Beat beat, Activity converted) { + Provider provider = new Provider(); + String mediaType = beat.getMediaType().toLowerCase(); + provider.setId(getProviderId(mediaType)); + provider.setDisplayName(StringUtils.capitalize(mediaType)); + converted.setProvider(provider); + } + + protected Map<String, BeatApi.BeatResponse.Beat.Tag> mapTags(BeatApi.BeatResponse.Beat beat) { + Map<String, BeatApi.BeatResponse.Beat.Tag> tags = new HashMap<>(); + for (BeatApi.BeatResponse.Beat.Tag tag : beat.getTag()) { + if (tag.getSystemType() != null) { + tags.put(tag.getSystemType().trim(), tag); + } } - - protected Map<String, BeatApi.BeatResponse.Beat.Tag> mapTags(BeatApi.BeatResponse.Beat beat) { - Map<String, BeatApi.BeatResponse.Beat.Tag> tags = new HashMap<>(); - for(BeatApi.BeatResponse.Beat.Tag tag : beat.getTag()) { - if(tag.getSystemType() != null) { - tags.put(tag.getSystemType().trim(), tag); - } - } - return tags; - } - + return tags; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/HeartbeatInfo.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/HeartbeatInfo.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/HeartbeatInfo.java index 5915140..eede9f4 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/HeartbeatInfo.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/HeartbeatInfo.java @@ -16,122 +16,116 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.sysomos.data; -import org.apache.streams.sysomos.data.SysomosTagDefinition; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.xml.sax.InputSource; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.List; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; /** - * Represents Heatbeat metadata from the Sysomos API + * Represents Heatbeat metadata from the Sysomos API. */ public class HeartbeatInfo { - private Document doc; - private List<SysomosTagDefinition> tags; - - public HeartbeatInfo(String xmlString) throws Exception { - DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); - DocumentBuilder docBuilder = dbFactory.newDocumentBuilder(); - this.doc = docBuilder.parse(new InputSource(new ByteArrayInputStream(xmlString.getBytes("utf-8")))); - this.tags = new ArrayList<SysomosTagDefinition>(); - createTagDefinitions(); - } - - - private void createTagDefinitions() { - this.tags = new ArrayList<SysomosTagDefinition>(); - NodeList tagList = this.doc.getElementsByTagName("tag"); - - for(int i=0; i < tagList.getLength(); ++i) { - Node tag = tagList.item(i); - SysomosTagDefinition tagDefinition = createSysomosTagDefinitionFromNode(tag); - if(this.hasTagName(tagDefinition.getTagName())) { - SysomosTagDefinition otherTag = this.getTagWithTagName(tagDefinition.getTagName()); - if(!otherTag.getDisplayName().equals(tagDefinition.getDisplayName())) { - throw new RuntimeException("A single tag ("+otherTag.getTagName()+") has multiple display names ("+otherTag.getDisplayName()+" , "+tagDefinition.getDisplayName()+")"); - } - else { - List<String> otherQueries = otherTag.getQueries(); - for(String query : tagDefinition.getQueries()) { - if(!otherQueries.contains(query)) { - otherTag.addQuery(query); - } - } - } - } - else { - this.tags.add(tagDefinition); - } - - } - } - - private SysomosTagDefinition createSysomosTagDefinitionFromNode(Node tag) { - Element tagElement = (Element) tag; - SysomosTagDefinition tagDefinition = new SysomosTagDefinition(tagElement.getElementsByTagName("name").item(0).getTextContent(), - tagElement.getElementsByTagName("displayName").item(0).getTextContent()); - NodeList taggingRule = tagElement.getElementsByTagName("taggingRule"); - for(int i=0; i < taggingRule.getLength(); ++i) { - Element rule = (Element) taggingRule.item(i); - NodeList queries = rule.getElementsByTagName("query"); - for(int j=0; j < queries.getLength(); ++j) { - Element query = (Element) queries.item(j); - tagDefinition.addQuery(query.getTextContent()); + private Document doc; + private List<SysomosTagDefinition> tags; + + /** + * HeartbeatInfo constructor. + * @param xmlString xmlString + * @throws Exception Exception + */ + public HeartbeatInfo(String xmlString) throws Exception { + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder docBuilder = dbFactory.newDocumentBuilder(); + this.doc = docBuilder.parse(new InputSource(new ByteArrayInputStream(xmlString.getBytes("utf-8")))); + this.tags = new ArrayList<SysomosTagDefinition>(); + createTagDefinitions(); + } + + private void createTagDefinitions() { + this.tags = new ArrayList<SysomosTagDefinition>(); + NodeList tagList = this.doc.getElementsByTagName("tag"); + + for (int i = 0; i < tagList.getLength(); ++i) { + Node tag = tagList.item(i); + SysomosTagDefinition tagDefinition = createSysomosTagDefinitionFromNode(tag); + if (this.hasTagName(tagDefinition.getTagName())) { + SysomosTagDefinition otherTag = this.getTagWithTagName(tagDefinition.getTagName()); + if (!otherTag.getDisplayName().equals(tagDefinition.getDisplayName())) { + throw new RuntimeException("A single tag (" + + otherTag.getTagName() + + ") has multiple display names (" + + otherTag.getDisplayName() + + " , " + + tagDefinition.getDisplayName() + + ")"); + } else { + List<String> otherQueries = otherTag.getQueries(); + for (String query : tagDefinition.getQueries()) { + if (!otherQueries.contains(query)) { + otherTag.addQuery(query); } + } } - return tagDefinition; - } + } else { + this.tags.add(tagDefinition); + } - public boolean hasTagName(String tagName) { - for(SysomosTagDefinition tag : this.tags) { - if(tag.hasTagName(tagName)) { - return true; - } - } - return false; } - - public SysomosTagDefinition getTagWithTagName(String tagName) { - for(SysomosTagDefinition tag : this.tags) { - if(tag.hasTagName(tagName)) { - return tag; - } - } - return null; + } + + private SysomosTagDefinition createSysomosTagDefinitionFromNode(Node tag) { + Element tagElement = (Element) tag; + SysomosTagDefinition tagDefinition = new SysomosTagDefinition(tagElement.getElementsByTagName("name").item(0).getTextContent(), + tagElement.getElementsByTagName("displayName").item(0).getTextContent()); + NodeList taggingRule = tagElement.getElementsByTagName("taggingRule"); + for (int i = 0; i < taggingRule.getLength(); ++i) { + Element rule = (Element) taggingRule.item(i); + NodeList queries = rule.getElementsByTagName("query"); + for (int j = 0; j < queries.getLength(); ++j) { + Element query = (Element) queries.item(j); + tagDefinition.addQuery(query.getTextContent()); + } } - - public boolean hasTagWithDisplayName(String displayName) { - for(SysomosTagDefinition tag : this.tags) { - if(tag.hasDisplayName(displayName)) { - return true; - } - } - return false; + return tagDefinition; + } + + /** + * hasTagName. + * @param tagName tagName + * @return hasTagName + */ + public boolean hasTagName(String tagName) { + for (SysomosTagDefinition tag : this.tags) { + if (tag.hasTagName(tagName)) { + return true; + } } - - public SysomosTagDefinition getTagWithDisplayName(String displayName) { - for(SysomosTagDefinition tag : this.tags) { - if(tag.hasDisplayName(displayName)) { - return tag; - } - } - return null; - } - - public List<SysomosTagDefinition> getTagDefinitions() { - List<SysomosTagDefinition> result = new ArrayList<SysomosTagDefinition>(); - result.addAll(this.tags); - return result; + return false; + } + + /** + * getTagWithTagName. + * @param tagName tagName + * @return SysomosTagDefinition + */ + public SysomosTagDefinition getTagWithTagName(String tagName) { + for (SysomosTagDefinition tag : this.tags) { + if (tag.hasTagName(tagName)) { + return tag; + } } + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/SysomosTagDefinition.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/SysomosTagDefinition.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/SysomosTagDefinition.java index a7a8cd4..889db31 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/SysomosTagDefinition.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/SysomosTagDefinition.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.sysomos.data; import java.util.ArrayList; @@ -26,66 +27,75 @@ import java.util.List; */ public class SysomosTagDefinition { - private String tagName; - private String displayName; - private List<String> queries; + private String tagName; + private String displayName; + private List<String> queries; - public SysomosTagDefinition(String tagName, String displayName) { - this.tagName = tagName; - this.displayName = displayName; - this.queries = new ArrayList<String>(); - } + /** + * SysomosTagDefinition constructor. + * @param tagName tagName + * @param displayName displayName + */ + public SysomosTagDefinition(String tagName, String displayName) { + this.tagName = tagName; + this.displayName = displayName; + this.queries = new ArrayList<String>(); + } - public String getTagName() { - return this.tagName; - } + public String getTagName() { + return this.tagName; + } - public String getDisplayName() { - return this.displayName; - } + public String getDisplayName() { + return this.displayName; + } - public List<String> getQueries() { - List<String> result = new ArrayList<String>(); - result.addAll(this.queries); - return result; - } + /** + * getQueries. + * @return Queries + */ + public List<String> getQueries() { + List<String> result = new ArrayList<String>(); + result.addAll(this.queries); + return result; + } - public void addQuery(String query) { - this.queries.add(query); - } + public void addQuery(String query) { + this.queries.add(query); + } - public boolean hasTagName(String tagName) { - return this.tagName.equals(tagName); - } + public boolean hasTagName(String tagName) { + return this.tagName.equals(tagName); + } - public boolean hasQuery(String query) { - return this.queries.contains(query); - } + public boolean hasQuery(String query) { + return this.queries.contains(query); + } - public boolean hasDisplayName(String displayName) { - return this.displayName.equals(displayName); - } + public boolean hasDisplayName(String displayName) { + return this.displayName.equals(displayName); + } - @Override - public boolean equals(Object o) { - if(!(o instanceof SysomosTagDefinition)) { - return false; - } - SysomosTagDefinition that = (SysomosTagDefinition) o; - if(!this.tagName.equals(that.tagName)) { - return false; - } - if(!this.displayName.equals(that.displayName)) { - return false; - } - if(this.queries.size() != that.queries.size()) { - return false; - } - for(int i=0; i < this.queries.size(); ++i) { - if(!that.queries.contains(this.queries.get(i))) { - return false; - } - } - return true; + @Override + public boolean equals(Object object) { + if (!(object instanceof SysomosTagDefinition)) { + return false; + } + SysomosTagDefinition that = (SysomosTagDefinition) object; + if (!this.tagName.equals(that.tagName)) { + return false; + } + if (!this.displayName.equals(that.displayName)) { + return false; + } + if (this.queries.size() != that.queries.size()) { + return false; + } + for (int i = 0; i < this.queries.size(); ++i) { + if (!that.queries.contains(this.queries.get(i))) { + return false; + } } + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java index 3ffd0b3..48615eb 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java @@ -19,45 +19,46 @@ package org.apache.streams.sysomos.processor; -import com.google.common.collect.Lists; -import com.sysomos.xml.BeatApi; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter; +import com.google.common.collect.Lists; +import com.sysomos.xml.BeatApi; + import java.util.List; /** - * Stream processor that converts Sysomos type to Activity + * Stream processor that converts Sysomos type to Activity. */ public class SysomosTypeConverter implements StreamsProcessor { - public final static String STREAMS_ID = "SysomosTypeConverter"; + public static final String STREAMS_ID = "SysomosTypeConverter"; - private SysomosBeatActivityConverter converter; + private SysomosBeatActivityConverter converter; - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) { - entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument())); - return Lists.newArrayList(entry); - } else { - return Lists.newArrayList(); - } + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + if (entry.getDocument() instanceof BeatApi.BeatResponse.Beat) { + entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument())); + return Lists.newArrayList(entry); + } else { + return Lists.newArrayList(); } + } - @Override - public void prepare(Object configurationObject) { - converter = new SysomosBeatActivityConverter(); - } + @Override + public void prepare(Object configurationObject) { + converter = new SysomosBeatActivityConverter(); + } - @Override - public void cleanUp() { - //NOP - } + @Override + public void cleanUp() { + //NOP + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java index 28f33df..de604b4 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java @@ -19,40 +19,41 @@ package org.apache.streams.sysomos.provider; -import com.sysomos.xml.BeatApi; -import com.sysomos.xml.ObjectFactory; import org.apache.streams.sysomos.SysomosException; import org.apache.streams.sysomos.util.SysomosUtils; + +import com.sysomos.xml.BeatApi; +import com.sysomos.xml.ObjectFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.StringReader; +import java.net.URL; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; -import java.io.StringReader; -import java.net.URL; /** * Defines a common pattern for requesting data from the Sysomos API. */ public abstract class AbstractRequestBuilder implements RequestBuilder { - private final static Logger LOGGER = LoggerFactory.getLogger(AbstractRequestBuilder.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRequestBuilder.class); - /** - * Executes the request to the Sysomos Heartbeat API and returns a valid response - */ - public BeatApi.BeatResponse execute() { - URL url = this.getRequestUrl(); - try { - String xmlResponse = SysomosUtils.queryUrl(url); - JAXBContext context = JAXBContext.newInstance(ObjectFactory.class); - Unmarshaller unmarshaller = context.createUnmarshaller(); - BeatApi beatApi = (BeatApi) unmarshaller.unmarshal(new StringReader(xmlResponse)); - return beatApi.getBeatResponse(); - } catch (JAXBException e) { - LOGGER.error("Unable to unmarshal XML content"); - throw new SysomosException("Unable to unmarshal XML content", e); - } + /** + * Executes the request to the Sysomos Heartbeat API and returns a valid response. + */ + public BeatApi.BeatResponse execute() { + URL url = this.getRequestUrl(); + try { + String xmlResponse = SysomosUtils.queryUrl(url); + JAXBContext context = JAXBContext.newInstance(ObjectFactory.class); + Unmarshaller unmarshaller = context.createUnmarshaller(); + BeatApi beatApi = (BeatApi) unmarshaller.unmarshal(new StringReader(xmlResponse)); + return beatApi.getBeatResponse(); + } catch (JAXBException ex) { + LOGGER.error("Unable to unmarshal XML content"); + throw new SysomosException("Unable to unmarshal XML content", ex); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java index 178014a..7ae47cf 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java @@ -20,6 +20,7 @@ package org.apache.streams.sysomos.provider; import org.apache.streams.sysomos.SysomosException; + import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +28,7 @@ import org.slf4j.LoggerFactory; import java.net.MalformedURLException; import java.net.URL; -import static org.apache.streams.sysomos.util.SysomosUtils.*; +import static org.apache.streams.sysomos.util.SysomosUtils.SYSOMOS_DATE_FORMATTER; /** * Builds requests for the Sysomos Heartbeat Content API. This is the preferred method of @@ -35,102 +36,102 @@ import static org.apache.streams.sysomos.util.SysomosUtils.*; */ public class ContentRequestBuilder extends AbstractRequestBuilder implements RequestBuilder { - private final static Logger LOGGER = LoggerFactory.getLogger(ContentRequestBuilder.class); - - private String baseUrl; - private String hid; - private String addedAfter; - private String addedBefore; - private String size; - private String offset; - private String apiKey; - - /** - * The max number of items you are allowed to get per request. - */ - public static final int MAX_ALLOWED_PER_REQUEST = 10000; - - /** - * Constructs a new ContentRequestBuilder for the specified API key and Sysomos URL - * @param baseUrl the base URL for the sysomos API - * @param apiKey the API key generated by Sysomos for authorization - */ - protected ContentRequestBuilder(String baseUrl, String apiKey) { - this.baseUrl = baseUrl; - this.apiKey = apiKey; - } - - /** - * Gets the Request URL based on the local fields - * @return a valid URL for the Sysomos API or an exception - */ - @Override - public URL getRequestUrl() { - StringBuilder url = new StringBuilder(); - url.append(this.baseUrl); - url.append("v1/heartbeat/content?"); - url.append("apiKey="); - url.append(this.apiKey); - url.append("&hid="); - url.append(this.hid); - if (size != null) { - url.append("&size="); - url.append(this.size); - } - if (this.addedAfter != null) { - url.append("&addedAfter="); - url.append(this.addedAfter); - } - if (this.addedBefore != null) { - url.append("&addedBefore="); - url.append(this.addedBefore); - } - if (this.offset != null) { - url.append("&offset="); - url.append(this.offset); - } - String urlString = url.toString(); - LOGGER.debug("Constructed Request URL: {}", urlString); - try { - return new URL(urlString); - } catch (MalformedURLException e) { - throw new SysomosException("Malformed Request URL. Check Request Builder parameters", e); - } + private static final Logger LOGGER = LoggerFactory.getLogger(ContentRequestBuilder.class); + + private String baseUrl; + private String hid; + private String addedAfter; + private String addedBefore; + private String size; + private String offset; + private String apiKey; + + /** + * The max number of items you are allowed to get per request. + */ + public static final int MAX_ALLOWED_PER_REQUEST = 10000; + + /** + * Constructs a new ContentRequestBuilder for the specified API key and Sysomos URL. + * @param baseUrl the base URL for the sysomos API + * @param apiKey the API key generated by Sysomos for authorization + */ + protected ContentRequestBuilder(String baseUrl, String apiKey) { + this.baseUrl = baseUrl; + this.apiKey = apiKey; + } + + /** + * Gets the Request URL based on the local fields. + * @return a valid URL for the Sysomos API or an exception + */ + @Override + public URL getRequestUrl() { + StringBuilder url = new StringBuilder(); + url.append(this.baseUrl); + url.append("v1/heartbeat/content?"); + url.append("apiKey="); + url.append(this.apiKey); + url.append("&hid="); + url.append(this.hid); + if (size != null) { + url.append("&size="); + url.append(this.size); } - - @Override - public RequestBuilder setHeartBeatId(int hid) { - return setHeartBeatId(Integer.toString(hid)); + if (this.addedAfter != null) { + url.append("&addedAfter="); + url.append(this.addedAfter); } - - @Override - public RequestBuilder setHeartBeatId(String hid) { - this.hid = hid; - return this; + if (this.addedBefore != null) { + url.append("&addedBefore="); + url.append(this.addedBefore); } - - @Override - public RequestBuilder setAddedAfterDate(DateTime afterDate) { - this.addedAfter = SYSOMOS_DATE_FORMATTER.print(afterDate); - return this; + if (this.offset != null) { + url.append("&offset="); + url.append(this.offset); } - - @Override - public RequestBuilder setAddedBeforeDate(DateTime beforeDate) { - this.addedBefore = SYSOMOS_DATE_FORMATTER.print(beforeDate); - return this; - } - - @Override - public RequestBuilder setReturnSetSize(long size) { - this.size = Long.toString(Math.min(size, MAX_ALLOWED_PER_REQUEST)); - return this; - } - - @Override - public RequestBuilder setOffset(long offset) { - this.offset = Long.toString(offset); - return this; + String urlString = url.toString(); + LOGGER.debug("Constructed Request URL: {}", urlString); + try { + return new URL(urlString); + } catch (MalformedURLException ex) { + throw new SysomosException("Malformed Request URL. Check Request Builder parameters", ex); } + } + + @Override + public RequestBuilder setHeartBeatId(int hid) { + return setHeartBeatId(Integer.toString(hid)); + } + + @Override + public RequestBuilder setHeartBeatId(String hid) { + this.hid = hid; + return this; + } + + @Override + public RequestBuilder setAddedAfterDate(DateTime afterDate) { + this.addedAfter = SYSOMOS_DATE_FORMATTER.print(afterDate); + return this; + } + + @Override + public RequestBuilder setAddedBeforeDate(DateTime beforeDate) { + this.addedBefore = SYSOMOS_DATE_FORMATTER.print(beforeDate); + return this; + } + + @Override + public RequestBuilder setReturnSetSize(long size) { + this.size = Long.toString(Math.min(size, MAX_ALLOWED_PER_REQUEST)); + return this; + } + + @Override + public RequestBuilder setOffset(long offset) { + this.offset = Long.toString(offset); + return this; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java index 0e12025..53887af 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java @@ -25,72 +25,73 @@ import org.joda.time.DateTime; import java.net.URL; /** - * Simplifying abstraction that aids in building a request to the Sysomos API in a chained fashion + * Simplifying abstraction that aids in building a request to the Sysomos API in a chained fashion. */ public interface RequestBuilder { - /** - * Sets the date after which documents should be returned from Sysomos - * @param afterDate the {@link org.joda.time.DateTime} instance representing the after date - * - * @return The RequestBuilder for continued Chaining - */ - RequestBuilder setAddedAfterDate(DateTime afterDate); - /** - * Sets the date before which documents should be returned from Sysomos - * @param beforeDate the {@link org.joda.time.DateTime} instance representing the before date - * - * @return The RequestBuilder for continued Chaining - */ - RequestBuilder setAddedBeforeDate(DateTime beforeDate); + /** + * Sets the date after which documents should be returned from Sysomos. + * @param afterDate the {@link org.joda.time.DateTime} instance representing the after date + * + * @return The RequestBuilder for continued Chaining + */ + RequestBuilder setAddedAfterDate(DateTime afterDate); - /** - * Sets the size of the expected response - * @param size the number of documents - * - * @return The RequestBuilder for continued Chaining - */ - RequestBuilder setReturnSetSize(long size); + /** + * Sets the date before which documents should be returned from Sysomos. + * @param beforeDate the {@link org.joda.time.DateTime} instance representing the before date + * + * @return The RequestBuilder for continued Chaining + */ + RequestBuilder setAddedBeforeDate(DateTime beforeDate); - /** - * Sets the starting offset for the number of documents given the other parameters - * @param offset the starting offset - * - * @return The RequestBuilder for continued Chaining - */ - RequestBuilder setOffset(long offset); + /** + * Sets the size of the expected response. + * @param size the number of documents + * + * @return The RequestBuilder for continued Chaining + */ + RequestBuilder setReturnSetSize(long size); - /** - * Sets the Sysomos Heartbeat ID - * @param hid Heartbeat ID - * - * @return The RequestBuilder for continued Chaining - */ - RequestBuilder setHeartBeatId(int hid); + /** + * Sets the starting offset for the number of documents given the other parameters. + * @param offset the starting offset + * + * @return The RequestBuilder for continued Chaining + */ + RequestBuilder setOffset(long offset); - /** - * - * Sets the Sysomos Heartbeat ID as a String - * @param hid Heartbeat ID string - * - * @return The RequestBuilder for continued Chaining - */ - RequestBuilder setHeartBeatId(String hid); + /** + * Sets the Sysomos Heartbeat ID. + * @param hid Heartbeat ID + * + * @return The RequestBuilder for continued Chaining + */ + RequestBuilder setHeartBeatId(int hid); - /** - * Returns the full url need to execute a request. - * - * Example: - * http://api.sysomos.com/dev/v1/heartbeat/content?apiKey=YOUR - * -APIKEY&hid=YOUR-HEARTBEAT-ID&offset=0&size=10& - * addedAfter=2010-10-15T13:00:00Z&addedBefore=2010-10-18T13:00:00Z - * - * @return the URL to use when making requests of Sysomos Heartbeat - */ - URL getRequestUrl(); + /** + * Sets the Sysomos Heartbeat ID as a String. + * @param hid Heartbeat ID string + * + * @return The RequestBuilder for continued Chaining + */ + RequestBuilder setHeartBeatId(String hid); - /** - * Executes the request to the Sysomos Heartbeat API and returns a valid response - */ - BeatApi.BeatResponse execute(); + /** + * Returns the full url need to execute a request. + * + * <p/> + * Example: + * http://api.sysomos.com/dev/v1/heartbeat/content?apiKey=YOUR + * -APIKEY&hid=YOUR-HEARTBEAT-ID&offset=0&size=10& + * addedAfter=2010-10-15T13:00:00Z&addedBefore=2010-10-18T13:00:00Z + * + * @return the URL to use when making requests of Sysomos Heartbeat + */ + URL getRequestUrl(); + + /** + * Executes the request to the Sysomos Heartbeat API and returns a valid response + */ + BeatApi.BeatResponse execute(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java index 488b6c7..6b59d1e 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java @@ -30,26 +30,19 @@ import java.net.URL; */ public class SysomosClient { - public static final String BASE_URL_STRING = "http://api.sysomos.com/"; - private static final String HEARTBEAT_INFO_URL = "http://api.sysomos.com/v1/heartbeat/info?apiKey={api_key}&hid={hid}"; + public static final String BASE_URL_STRING = "http://api.sysomos.com/"; + private static final String HEARTBEAT_INFO_URL = "http://api.sysomos.com/v1/heartbeat/info?apiKey={api_key}&hid={hid}"; - private String apiKey; + private String apiKey; - private HttpURLConnection client; + private HttpURLConnection client; - public SysomosClient(String apiKey) { - this.apiKey = apiKey; - } + public SysomosClient(String apiKey) { + this.apiKey = apiKey; + } - public HeartbeatInfo getHeartbeatInfo(String hid) throws Exception { - String urlString = HEARTBEAT_INFO_URL.replace("{api_key}", this.apiKey); - urlString = urlString.replace("{hid}", hid); - String xmlResponse = SysomosUtils.queryUrl(new URL(urlString)); - return new HeartbeatInfo(xmlResponse); - } - - public RequestBuilder createRequestBuilder() { - return new ContentRequestBuilder(BASE_URL_STRING, this.apiKey); - } + public RequestBuilder createRequestBuilder() { + return new ContentRequestBuilder(BASE_URL_STRING, this.apiKey); + } }
