http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-pojo/src/test/java/org/apache/streams/pojo/test/RFC3339UtilsTest.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/test/java/org/apache/streams/pojo/test/RFC3339UtilsTest.java b/streams-pojo/src/test/java/org/apache/streams/pojo/test/RFC3339UtilsTest.java index b44e72a..bce756a 100644 --- a/streams-pojo/src/test/java/org/apache/streams/pojo/test/RFC3339UtilsTest.java +++ b/streams-pojo/src/test/java/org/apache/streams/pojo/test/RFC3339UtilsTest.java @@ -19,6 +19,7 @@ package org.apache.streams.pojo.test; import org.apache.streams.data.util.RFC3339Utils; + import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Test; @@ -38,189 +39,190 @@ import static org.junit.Assert.fail; */ public class RFC3339UtilsTest { - @Test - public void validUTC() { - DateTime parsed = parseUTC("2014-12-25T12:00:00Z"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(12))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - } - - @Test - public void validUTCSubSecond() { - DateTime parsed = parseUTC("2014-12-25T12:00:00.7Z"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(12))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - assertThat(parsed.millisOfSecond().get(), is(equalTo(700))); - } - - @Test - public void validUTCSubSecondMultiDigit() { - DateTime parsed = parseUTC("2014-12-25T12:00:00.7343Z"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(12))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - assertThat(parsed.millisOfSecond().get(), is(equalTo(734))); - } - - @Test - public void validEST() { - DateTime parsed = parseUTC("2014-12-25T12:00:00-05:00"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(17))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - } - - @Test - public void validESTSubSecond() { - DateTime parsed = parseUTC("2014-12-25T12:00:00.7-05:00"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(17))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - assertThat(parsed.millisOfSecond().get(), is(equalTo(700))); - } - - @Test - public void validESTSubSecondMultiDigit() { - DateTime parsed = parseUTC("2014-12-25T12:00:00.7343-05:00"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(17))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - assertThat(parsed.millisOfSecond().get(), is(equalTo(734))); - } - - @Test - public void validESTNoSeparator() { - DateTime parsed = parseUTC("2014-12-25T12:00:00-0500"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(17))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - } - - @Test - public void validESTSubSecondNoSeparator() { - DateTime parsed = parseUTC("2014-12-25T12:00:00.7-0500"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(17))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - assertThat(parsed.millisOfSecond().get(), is(equalTo(700))); - } - - @Test - public void validESTSubSecondMultiDigitNoSeparator() { - DateTime parsed = parseUTC("2014-12-25T12:00:00.7343-0500"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(17))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - assertThat(parsed.millisOfSecond().get(), is(equalTo(734))); - } - - @Test - public void validCET() { - DateTime parsed = parseUTC("2014-12-25T12:00:00+01:00"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(11))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - } - - @Test - public void validCETSubSecond() { - DateTime parsed = parseUTC("2014-12-25T12:00:00.7+01:00"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(11))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - assertThat(parsed.millisOfSecond().get(), is(equalTo(700))); - } - - @Test - public void validCETSubSecondMultidigit() { - DateTime parsed = parseUTC("2014-12-25T12:00:00.7343+01:00"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(11))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - assertThat(parsed.millisOfSecond().get(), is(equalTo(734))); - } - - @Test - public void validLong() { - DateTime parsed = parseUTC("1419505200734"); - assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); - assertThat(parsed.hourOfDay().get(), is(equalTo(11))); - assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); - assertThat(parsed.monthOfYear().get(), is(equalTo(12))); - assertThat(parsed.millisOfSecond().get(), is(equalTo(734))); - } - - @Test - public void validFormatUTC() { - DateTime parsed = new DateTime(1419505200734L); - assertThat(format(parsed), is(equalTo("2014-12-25T11:00:00.734Z"))); - } - - @Test - public void validFormat() { - TimeZone cet = TimeZone.getTimeZone("CET"); - DateTime parsed = new DateTime(1419505200734L); - assertThat(format(parsed, cet), is(equalTo("2014-12-25T12:00:00.734+0100"))); - } - - @Test - public void testParseVariousDateFormats() { - String date = "Thu April 24 04:43:10 -0500 2014"; - DateTime expected = new DateTime(2014, 4, 24, 9, 43, 10, DateTimeZone.forOffsetHours(0)); - testHelper(expected, date); - date = "2014/04/24 04:43:10"; - expected = new DateTime(2014, 4, 24, 4, 43, 10, DateTimeZone.forOffsetHours(0)); - testHelper(expected, date); - date = "2014-04-24T04:43:10Z"; - testHelper(expected, date); - date = "04:43:10 2014/04/24"; - testHelper(expected, date); - date = "4/24/2014 04:43:10"; - testHelper(expected, date); - date = "04:43:10 4/24/2014"; - testHelper(expected, date); - date = "04:43:10 2014-04-24"; - testHelper(expected, date); - date = "4-24-2014 04:43:10"; - testHelper(expected, date); - date = "04:43:10 4-24-2014"; - testHelper(expected, date); - expected = new DateTime(2014, 4, 24, 0, 0, 0, DateTimeZone.forOffsetHours(0)); - date = "24-4-2014"; - testHelper(expected, date); - date = "2014-4-24"; - testHelper(expected, date); - date = "2014/4/24"; - testHelper(expected, date); - date = "2014/4/24 fesdfs"; - try { - RFC3339Utils.parseToUTC(date); - fail("Should not have been able to parse : "+date); - } catch (Exception e) { - } - } - - private void testHelper(DateTime expected, String dateString) { - DateTime parsedDate = RFC3339Utils.parseToUTC(dateString); - assertEquals("Failed to parse : "+dateString, expected, parsedDate); - String rfc3339String = RFC3339Utils.format(dateString); - String parsedRfc3339String = RFC3339Utils.format(parsedDate); - assertEquals("Parsed String should be equal.", parsedRfc3339String, rfc3339String); - DateTime convertedBack = RFC3339Utils.parseToUTC(parsedRfc3339String); - assertEquals(expected, convertedBack); - } + @Test + public void validUTC() { + DateTime parsed = parseUTC("2014-12-25T12:00:00Z"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(12))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + } + + @Test + public void validUTCSubSecond() { + DateTime parsed = parseUTC("2014-12-25T12:00:00.7Z"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(12))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + assertThat(parsed.millisOfSecond().get(), is(equalTo(700))); + } + + @Test + public void validUTCSubSecondMultiDigit() { + DateTime parsed = parseUTC("2014-12-25T12:00:00.7343Z"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(12))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + assertThat(parsed.millisOfSecond().get(), is(equalTo(734))); + } + + @Test + public void validEST() { + DateTime parsed = parseUTC("2014-12-25T12:00:00-05:00"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(17))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + } + + @Test + public void validESTSubSecond() { + DateTime parsed = parseUTC("2014-12-25T12:00:00.7-05:00"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(17))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + assertThat(parsed.millisOfSecond().get(), is(equalTo(700))); + } + + @Test + public void validESTSubSecondMultiDigit() { + DateTime parsed = parseUTC("2014-12-25T12:00:00.7343-05:00"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(17))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + assertThat(parsed.millisOfSecond().get(), is(equalTo(734))); + } + + @Test + public void validESTNoSeparator() { + DateTime parsed = parseUTC("2014-12-25T12:00:00-0500"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(17))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + } + + @Test + public void validESTSubSecondNoSeparator() { + DateTime parsed = parseUTC("2014-12-25T12:00:00.7-0500"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(17))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + assertThat(parsed.millisOfSecond().get(), is(equalTo(700))); + } + + @Test + public void validESTSubSecondMultiDigitNoSeparator() { + DateTime parsed = parseUTC("2014-12-25T12:00:00.7343-0500"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(17))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + assertThat(parsed.millisOfSecond().get(), is(equalTo(734))); + } + + @Test + public void validCET() { + DateTime parsed = parseUTC("2014-12-25T12:00:00+01:00"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(11))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + } + + @Test + public void validCETSubSecond() { + DateTime parsed = parseUTC("2014-12-25T12:00:00.7+01:00"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(11))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + assertThat(parsed.millisOfSecond().get(), is(equalTo(700))); + } + + @Test + public void validCETSubSecondMultidigit() { + DateTime parsed = parseUTC("2014-12-25T12:00:00.7343+01:00"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(11))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + assertThat(parsed.millisOfSecond().get(), is(equalTo(734))); + } + + @Test + public void validLong() { + DateTime parsed = parseUTC("1419505200734"); + assertThat(parsed.minuteOfHour().get(), is(equalTo(0))); + assertThat(parsed.hourOfDay().get(), is(equalTo(11))); + assertThat(parsed.dayOfMonth().get(), is(equalTo(25))); + assertThat(parsed.monthOfYear().get(), is(equalTo(12))); + assertThat(parsed.millisOfSecond().get(), is(equalTo(734))); + } + + @Test + public void validFormatUTC() { + DateTime parsed = new DateTime(1419505200734L); + assertThat(format(parsed), is(equalTo("2014-12-25T11:00:00.734Z"))); + } + + @Test + public void validFormat() { + TimeZone cet = TimeZone.getTimeZone("CET"); + DateTime parsed = new DateTime(1419505200734L); + assertThat(format(parsed, cet), is(equalTo("2014-12-25T12:00:00.734+0100"))); + } + + @Test + public void testParseVariousDateFormats() { + String date = "Thu April 24 04:43:10 -0500 2014"; + DateTime expected = new DateTime(2014, 4, 24, 9, 43, 10, DateTimeZone.forOffsetHours(0)); + testHelper(expected, date); + date = "2014/04/24 04:43:10"; + expected = new DateTime(2014, 4, 24, 4, 43, 10, DateTimeZone.forOffsetHours(0)); + testHelper(expected, date); + date = "2014-04-24T04:43:10Z"; + testHelper(expected, date); + date = "04:43:10 2014/04/24"; + testHelper(expected, date); + date = "4/24/2014 04:43:10"; + testHelper(expected, date); + date = "04:43:10 4/24/2014"; + testHelper(expected, date); + date = "04:43:10 2014-04-24"; + testHelper(expected, date); + date = "4-24-2014 04:43:10"; + testHelper(expected, date); + date = "04:43:10 4-24-2014"; + testHelper(expected, date); + expected = new DateTime(2014, 4, 24, 0, 0, 0, DateTimeZone.forOffsetHours(0)); + date = "24-4-2014"; + testHelper(expected, date); + date = "2014-4-24"; + testHelper(expected, date); + date = "2014/4/24"; + testHelper(expected, date); + date = "2014/4/24 fesdfs"; + try { + RFC3339Utils.parseToUTC(date); + fail("Should not have been able to parse : " + date); + } catch (Exception ex) { + // + } + } + + private void testHelper(DateTime expected, String dateString) { + DateTime parsedDate = RFC3339Utils.parseToUTC(dateString); + assertEquals("Failed to parse : " + dateString, expected, parsedDate); + String rfc3339String = RFC3339Utils.format(dateString); + String parsedRfc3339String = RFC3339Utils.format(parsedDate); + assertEquals("Parsed String should be equal.", parsedRfc3339String, rfc3339String); + DateTime convertedBack = RFC3339Utils.parseToUTC(parsedRfc3339String); + assertEquals(expected, convertedBack); + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java index ce4388e..a09abda 100644 --- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java +++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java @@ -18,19 +18,26 @@ package org.apache.streams.dropwizard; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Splitter; -import com.google.common.collect.Queues; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.ComponentUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Splitter; +import com.google.common.collect.Queues; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigInteger; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; import javax.annotation.Resource; import javax.ws.rs.Consumes; import javax.ws.rs.POST; @@ -40,17 +47,11 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.math.BigInteger; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; /** * GenericWebhookResource provides basic webhook connectivity. * + * <p/> * Add processors / persistWriters that read from "GenericWebhookResource" to * consume data posted to streams. */ @@ -60,205 +61,222 @@ import java.util.regex.Pattern; @Consumes(MediaType.APPLICATION_JSON) public class GenericWebhookResource implements StreamsProvider { - public static final String STREAMS_ID = "GenericWebhookResource"; + public static final String STREAMS_ID = "GenericWebhookResource"; - public GenericWebhookResource() { - } - - private static final Logger log = LoggerFactory - .getLogger(GenericWebhookResource.class); + public GenericWebhookResource() { + } - private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private static final Logger log = LoggerFactory + .getLogger(GenericWebhookResource.class); - protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); + private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>(); - private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE); + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - @Override - public String getId() { - return STREAMS_ID; - } + private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE); - @POST - @Path("json") - public Response json(@Context HttpHeaders headers, - String body) { + @Override + public String getId() { + return STREAMS_ID; + } - ObjectNode response = mapper.createObjectNode(); - int responseCode = Response.Status.BAD_REQUEST.getStatusCode(); + /** + * push a String json datum into a stream. + * @param headers HttpHeaders + * @param body String json + * @return Response + */ + @POST + @Path("json") + public Response json(@Context HttpHeaders headers, + String body) { - try { - ObjectNode item = mapper.readValue(body, ObjectNode.class); + ObjectNode response = mapper.createObjectNode(); + int responseCode = Response.Status.BAD_REQUEST.getStatusCode(); - StreamsDatum datum = new StreamsDatum(body); + try { + ObjectNode item = mapper.readValue(body, ObjectNode.class); - lock.writeLock().lock(); - ComponentUtils.offerUntilSuccess(datum, providerQueue); - lock.writeLock().unlock(); + StreamsDatum datum = new StreamsDatum(body); - Boolean success = true; + lock.writeLock().lock(); + ComponentUtils.offerUntilSuccess(datum, providerQueue); + lock.writeLock().unlock(); - response.put("success", success); + Boolean success = true; - responseCode = Response.Status.OK.getStatusCode(); + response.put("success", success); - } catch (Exception e) { - log.warn(e.toString(), e); + responseCode = Response.Status.OK.getStatusCode(); - Boolean success = false; + } catch (Exception ex) { + log.warn(ex.toString(), ex); - response.put("success", success); - responseCode = Response.Status.BAD_REQUEST.getStatusCode(); + Boolean success = false; - } finally { - return Response.status(responseCode).entity(response).build(); + response.put("success", success); + responseCode = Response.Status.BAD_REQUEST.getStatusCode(); - } + } finally { + return Response.status(responseCode).entity(response).build(); } + } - @POST - @Path("json_new_line") - public Response json_new_line(@Context HttpHeaders headers, - String body) { + /** + * push multiple String json datums into a stream. + * @param headers HttpHeaders + * @param body String json + * @return Response + */ + @POST + @Path("json_new_line") + public Response json_new_line(@Context HttpHeaders headers, + String body) { - ObjectNode response = mapper.createObjectNode(); - int responseCode = Response.Status.BAD_REQUEST.getStatusCode(); + ObjectNode response = mapper.createObjectNode(); + int responseCode = Response.Status.BAD_REQUEST.getStatusCode(); - if (body.equalsIgnoreCase("{}")) { + if (body.equalsIgnoreCase("{}")) { - Boolean success = true; + Boolean success = true; - response.put("success", success); - responseCode = Response.Status.OK.getStatusCode(); - return Response.status(responseCode).entity(response).build(); - } - - try { + response.put("success", success); + responseCode = Response.Status.OK.getStatusCode(); + return Response.status(responseCode).entity(response).build(); + } - for( String line : Splitter.on(newLinePattern).split(body)) { - ObjectNode item = mapper.readValue(line, ObjectNode.class); + try { - StreamsDatum datum = new StreamsDatum(item); + for ( String line : Splitter.on(newLinePattern).split(body)) { + ObjectNode item = mapper.readValue(line, ObjectNode.class); - lock.writeLock().lock(); - ComponentUtils.offerUntilSuccess(datum, providerQueue); - lock.writeLock().unlock(); + StreamsDatum datum = new StreamsDatum(item); - } + lock.writeLock().lock(); + ComponentUtils.offerUntilSuccess(datum, providerQueue); + lock.writeLock().unlock(); - Boolean success = true; + } - response.put("success", success); - responseCode = Response.Status.OK.getStatusCode(); + Boolean success = true; - } catch (Exception e) { - log.warn(e.toString(), e); + response.put("success", success); + responseCode = Response.Status.OK.getStatusCode(); - Boolean success = false; + } catch (Exception ex) { + log.warn(ex.toString(), ex); - response.put("success", success); - responseCode = Response.Status.BAD_REQUEST.getStatusCode(); + Boolean success = false; - } finally { - return Response.status(responseCode).entity(response).build(); + response.put("success", success); + responseCode = Response.Status.BAD_REQUEST.getStatusCode(); - } + } finally { + return Response.status(responseCode).entity(response).build(); } - @POST - @Path("json_meta") - public Response json_meta(@Context HttpHeaders headers, - String body) { + } - ObjectNode response = mapper.createObjectNode(); - int responseCode = Response.Status.BAD_REQUEST.getStatusCode(); + /** + * push multiple ObjectNode json datums into a stream. + * @param headers HttpHeaders + * @param body String json + * @return Response + */ + @POST + @Path("json_meta") + public Response json_meta(@Context HttpHeaders headers, + String body) { - if (body.equalsIgnoreCase("{}")) { + ObjectNode response = mapper.createObjectNode(); + int responseCode = Response.Status.BAD_REQUEST.getStatusCode(); - Boolean success = true; + if (body.equalsIgnoreCase("{}")) { - response.put("success", success); - responseCode = Response.Status.OK.getStatusCode(); + Boolean success = true; - return Response.status(responseCode).entity(response).build(); - } + response.put("success", success); + responseCode = Response.Status.OK.getStatusCode(); - try { + return Response.status(responseCode).entity(response).build(); + } - GenericWebhookData objectWrapper = mapper.readValue(body, GenericWebhookData.class); + try { - for( ObjectNode item : objectWrapper.getData()) { + GenericWebhookData objectWrapper = mapper.readValue(body, GenericWebhookData.class); - StreamsDatum datum = new StreamsDatum(item); + for ( ObjectNode item : objectWrapper.getData()) { - lock.writeLock().lock(); - ComponentUtils.offerUntilSuccess(datum, providerQueue); - lock.writeLock().unlock(); - } + StreamsDatum datum = new StreamsDatum(item); - Boolean success = true; + lock.writeLock().lock(); + ComponentUtils.offerUntilSuccess(datum, providerQueue); + lock.writeLock().unlock(); + } - response.put("success", success); - responseCode = Response.Status.OK.getStatusCode(); + Boolean success = true; - } catch (Exception e) { - log.warn(e.toString(), e); + response.put("success", success); + responseCode = Response.Status.OK.getStatusCode(); - Boolean success = false; + } catch (Exception ex) { + log.warn(ex.toString(), ex); - response.put("success", success); - responseCode = Response.Status.BAD_REQUEST.getStatusCode(); - } finally { - return Response.status(responseCode).entity(response).build(); - } + Boolean success = false; + response.put("success", success); + responseCode = Response.Status.BAD_REQUEST.getStatusCode(); + } finally { + return Response.status(responseCode).entity(response).build(); } - @Override - public void startStream() { - return; - } + } - @Override - public StreamsResultSet readCurrent() { + @Override + public void startStream() { + return; + } - StreamsResultSet current; + @Override + public StreamsResultSet readCurrent() { - lock.writeLock().lock(); - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); - providerQueue.clear(); - lock.writeLock().unlock(); + StreamsResultSet current; - return current; + lock.writeLock().lock(); + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); + providerQueue.clear(); + lock.writeLock().unlock(); - } + return current; - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return null; - } + } - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; - } + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } - @Override - public boolean isRunning() { - return true; - } + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } - @Override - public void prepare(Object configurationObject) { + @Override + public boolean isRunning() { + return true; + } - } + @Override + public void prepare(Object configurationObject) { - @Override - public void cleanUp() { + } - } + @Override + public void cleanUp() { + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java index 0fcc4eb..f13a41f 100644 --- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java +++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java @@ -18,46 +18,47 @@ package org.apache.streams.dropwizard; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamBuilder; import org.apache.streams.core.StreamsProvider; import org.apache.streams.local.LocalRuntimeConfiguration; import org.apache.streams.local.builders.LocalStreamBuilder; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.util.Map; /** - * StreamDropwizardBuilder is currently a light wrapper around LocalStreamBuilder + * StreamDropwizardBuilder is currently a light wrapper around LocalStreamBuilder. * + * <p/> * It's a seperate class because they will almost certainly deviate going forward */ public class StreamDropwizardBuilder extends LocalStreamBuilder implements StreamBuilder { - public StreamDropwizardBuilder() { - super(); - } + public StreamDropwizardBuilder() { + super(); + } - public StreamDropwizardBuilder(StreamsConfiguration streamConfig) { - super(new ObjectMapper().convertValue(streamConfig, LocalRuntimeConfiguration.class)); - } + public StreamDropwizardBuilder(StreamsConfiguration streamConfig) { + super(new ObjectMapper().convertValue(streamConfig, LocalRuntimeConfiguration.class)); + } - public StreamDropwizardBuilder(Map<String, Object> streamConfig) { - super(streamConfig); - } + public StreamDropwizardBuilder(Map<String, Object> streamConfig) { + super(streamConfig); + } - public StreamDropwizardBuilder(int maxQueueCapacity) { - super(maxQueueCapacity); - } + public StreamDropwizardBuilder(int maxQueueCapacity) { + super(maxQueueCapacity); + } - public StreamDropwizardBuilder(int maxQueueCapacity, Map<String, Object> streamConfig) { - super(maxQueueCapacity, streamConfig); - } + public StreamDropwizardBuilder(int maxQueueCapacity, Map<String, Object> streamConfig) { + super(maxQueueCapacity, streamConfig); + } - @Override - public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider) { - return super.newPerpetualStream(streamId, provider); - } + @Override + public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider) { + return super.newPerpetualStream(streamId, provider); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java index fecf2f7..38d0f7b 100644 --- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java +++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java @@ -18,175 +18,163 @@ package org.apache.streams.dropwizard; -import com.codahale.metrics.Counter; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.codahale.metrics.MetricRegistry; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.guava.GuavaModule; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; -import com.google.common.base.Strings; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.hubspot.dropwizard.guice.GuiceBundle; -import com.sun.jersey.api.core.ResourceConfig; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigRenderOptions; import io.dropwizard.Application; import io.dropwizard.jackson.GuavaExtrasModule; import io.dropwizard.metrics.MetricsFactory; import io.dropwizard.setup.Bootstrap; import io.dropwizard.setup.Environment; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamBuilder; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.local.builders.LocalStreamBuilder; -import org.apache.streams.pojo.json.Activity; -import org.joda.time.DateTime; -import org.reflections.Reflections; -import org.reflections.util.ConfigurationBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.inject.Inject; - -import javax.annotation.Resource; -import javax.ws.rs.Path; /** * Entry point to a dropwizard streams application * + * <p/> * It will start up a stream in the local runtime, as well as bind any * StreamsProvider on the classpath with a @Resource annotation. - * */ public class StreamsApplication extends Application<StreamsDropwizardConfiguration> { - private static final Logger LOGGER = LoggerFactory - .getLogger(StreamsApplication.class); - - protected static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private static final Logger LOGGER = LoggerFactory + .getLogger(StreamsApplication.class); - protected StreamBuilder builder; + protected static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - private static StreamsConfiguration streamsConfiguration; + protected StreamBuilder builder; - // ConcurrentHashSet is preferable, but it's only in guava 15+ - // spark 1.5.0 uses guava 14 so for the moment this is the workaround - // Set<StreamsProvider> resourceProviders = Sets.newConcurrentHashSet(); - private Set<StreamsProvider> resourceProviders = Collections.newSetFromMap(new ConcurrentHashMap<StreamsProvider, Boolean>()); + private static StreamsConfiguration streamsConfiguration; - private Executor executor = Executors.newSingleThreadExecutor(); + // ConcurrentHashSet is preferable, but it's only in guava 15+ + // spark 1.5.0 uses guava 14 so for the moment this is the workaround + // Set<StreamsProvider> resourceProviders = Sets.newConcurrentHashSet(); + private Set<StreamsProvider> resourceProviders = Collections.newSetFromMap(new ConcurrentHashMap<StreamsProvider, Boolean>()); - static { - mapper.registerModule(new AfterburnerModule()); - mapper.registerModule(new GuavaModule()); - mapper.registerModule(new GuavaExtrasModule()); - } + private Executor executor = Executors.newSingleThreadExecutor(); - @Override - public void initialize(Bootstrap<StreamsDropwizardConfiguration> bootstrap) { + static { + mapper.registerModule(new AfterburnerModule()); + mapper.registerModule(new GuavaModule()); + mapper.registerModule(new GuavaExtrasModule()); + } - LOGGER.info(getClass().getPackage().getName()); + @Override + public void initialize(Bootstrap<StreamsDropwizardConfiguration> bootstrap) { - GuiceBundle<StreamsDropwizardConfiguration> guiceBundle = - GuiceBundle.<StreamsDropwizardConfiguration>newBuilder() - .addModule(new StreamsDropwizardModule()) - .setConfigClass(StreamsDropwizardConfiguration.class) - // override and add more packages to pick up custom Resources - .enableAutoConfig(getClass().getPackage().getName()) - .build(); - bootstrap.addBundle(guiceBundle); + LOGGER.info(getClass().getPackage().getName()); - } + GuiceBundle<StreamsDropwizardConfiguration> guiceBundle = + GuiceBundle.<StreamsDropwizardConfiguration>newBuilder() + .addModule(new StreamsDropwizardModule()) + .setConfigClass(StreamsDropwizardConfiguration.class) + // override and add more packages to pick up custom Resources + .enableAutoConfig(getClass().getPackage().getName()) + .build(); + bootstrap.addBundle(guiceBundle); - @Override - public void run(StreamsDropwizardConfiguration streamsDropwizardConfiguration, Environment environment) throws Exception { + } - executor = Executors.newSingleThreadExecutor(); + @Override + public void run(StreamsDropwizardConfiguration streamsDropwizardConfiguration, Environment environment) throws Exception { - for( Class<?> resourceProviderClass : environment.jersey().getResourceConfig().getRootResourceClasses() ) { - StreamsProvider provider = (StreamsProvider)resourceProviderClass.newInstance(); - if( StreamsProvider.class.isInstance(provider)) - resourceProviders.add(provider); - } + executor = Executors.newSingleThreadExecutor(); - MetricRegistry metrics = new MetricRegistry(); - MetricsFactory mfac = streamsDropwizardConfiguration.getMetricsFactory(); - mfac.configure(environment.lifecycle(), metrics); + for ( Class<?> resourceProviderClass : environment.jersey().getResourceConfig().getRootResourceClasses() ) { + StreamsProvider provider = (StreamsProvider)resourceProviderClass.newInstance(); + if ( StreamsProvider.class.isInstance(provider)) { + resourceProviders.add(provider); + } + } - streamsConfiguration = mapper.convertValue(streamsDropwizardConfiguration, StreamsConfiguration.class); + MetricRegistry metrics = new MetricRegistry(); + MetricsFactory mfac = streamsDropwizardConfiguration.getMetricsFactory(); + mfac.configure(environment.lifecycle(), metrics); - builder = setup(streamsConfiguration, resourceProviders); + streamsConfiguration = mapper.convertValue(streamsDropwizardConfiguration, StreamsConfiguration.class); - executor.execute(new StreamsDropwizardRunner(builder, streamsConfiguration)); + builder = setup(streamsConfiguration, resourceProviders); - // wait for streams to start up - Thread.sleep(10000); + executor.execute(new StreamsDropwizardRunner(builder, streamsConfiguration)); - for (StreamsProvider resource : resourceProviders) { - environment.jersey().register(resource); - LOGGER.info("Added resource class: {}", resource); - } + // wait for streams to start up + Thread.sleep(10000); + for (StreamsProvider resource : resourceProviders) { + environment.jersey().register(resource); + LOGGER.info("Added resource class: {}", resource); } - public StreamBuilder setup(StreamsConfiguration streamsConfiguration, Set<StreamsProvider> resourceProviders) { + } - StreamBuilder builder = new StreamDropwizardBuilder(streamsConfiguration); + /** + * setup StreamBuilder. + * @param streamsConfiguration StreamsConfiguration + * @param resourceProviders Set of StreamsProvider + * @return StreamBuilder + */ + public StreamBuilder setup(StreamsConfiguration streamsConfiguration, Set<StreamsProvider> resourceProviders) { - List<String> providers = new ArrayList<>(); - for( StreamsProvider provider: resourceProviders) { - String providerId = provider.getClass().getSimpleName(); - builder.newPerpetualStream(providerId, provider); - providers.add(providerId); - } + StreamBuilder builder = new StreamDropwizardBuilder(streamsConfiguration); - return builder; + List<String> providers = new ArrayList<>(); + for ( StreamsProvider provider: resourceProviders) { + String providerId = provider.getClass().getSimpleName(); + builder.newPerpetualStream(providerId, provider); + providers.add(providerId); } - private class StreamsDropwizardRunner implements Runnable { - - private StreamsConfiguration streamsConfiguration; - - private StreamBuilder builder; + return builder; + } - protected StreamsDropwizardRunner(StreamBuilder builder, StreamsConfiguration streamsConfiguration) { - this.streamsConfiguration = streamsConfiguration; - this.builder = builder; - } + private class StreamsDropwizardRunner implements Runnable { - @Override - public void run() { + private StreamsConfiguration streamsConfiguration; - builder.start(); + private StreamBuilder builder; - } + protected StreamsDropwizardRunner(StreamBuilder builder, StreamsConfiguration streamsConfiguration) { + this.streamsConfiguration = streamsConfiguration; + this.builder = builder; } + @Override + public void run() { - public static void main(String[] args) throws Exception - { - - new StreamsApplication().run(args); + builder.start(); } + } + + /** + * Run from console: + * + * <p/> + * java -jar uber.jar server ./configuration.yml + * + * @param args ["server", configuration.yml] + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + new StreamsApplication().run(args); + + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java index 01682c0..9514caf 100644 --- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java +++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java @@ -18,35 +18,31 @@ package org.apache.streams.dropwizard; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.inject.AbstractModule; -import com.google.inject.Provides; -import com.google.inject.Singleton; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigRenderOptions; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; -import java.io.IOException; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Singleton; /** * This class exists because dropwizard-guice requires at least - * one module to run + * one module to run. * + * <p/> * Do not expect @Inject StreamsConfiguration to work at the moment. */ public class StreamsDropwizardModule extends AbstractModule { - @Override - protected void configure() { - requestStaticInjection(StreamsConfiguration.class); - } + @Override + protected void configure() { + requestStaticInjection(StreamsConfiguration.class); + } - @Provides - @Singleton - public StreamsConfiguration providesStreamsConfiguration() { - return StreamsConfigurator.detectConfiguration(); - } + @Provides + @Singleton + public StreamsConfiguration providesStreamsConfiguration() { + return StreamsConfigurator.detectConfiguration(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java index ffe9d62..a7251df 100644 --- a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java +++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java @@ -18,82 +18,70 @@ package org.apache.streams.dropwizard.test; -import com.fasterxml.jackson.core.JsonParseException; +import org.apache.streams.dropwizard.GenericWebhookData; +import org.apache.streams.dropwizard.GenericWebhookResource; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; -import com.google.common.io.Resources; -import io.dropwizard.testing.junit.DropwizardAppRule; import io.dropwizard.testing.junit.ResourceTestRule; -import org.apache.streams.dropwizard.GenericWebhookData; -import org.apache.streams.dropwizard.GenericWebhookResource; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; -import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; -import javax.ws.rs.core.HttpHeaders; -import javax.xml.ws.Response; - import java.util.List; -import static org.mockito.Mockito.*; - /** * Tests {@link: org.apache.streams.dropwizard.GenericWebhookResource} */ public class GenericWebhookResourceTest { - private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - private static final GenericWebhookResource genericWebhookResource = new GenericWebhookResource(); + private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - @ClassRule - public static final ResourceTestRule resources = ResourceTestRule.builder() - .addResource(genericWebhookResource) - .build(); + private static final GenericWebhookResource genericWebhookResource = new GenericWebhookResource(); - @Test - public void testPostJson() { - Assert.assertEquals(400, genericWebhookResource.json(null, "{").getStatus()); - Assert.assertEquals(400, genericWebhookResource.json(null, "}").getStatus()); - Assert.assertEquals(400, genericWebhookResource.json(null, "srg").getStatus()); - Assert.assertEquals(400, genericWebhookResource.json(null, "123").getStatus()); - Assert.assertEquals(200, genericWebhookResource.json(null, "{}").getStatus()); - Assert.assertEquals(200, genericWebhookResource.json(null, "{\"valid\":\"true\"}").getStatus()); - }; + @ClassRule + public static final ResourceTestRule resources = ResourceTestRule.builder() + .addResource(genericWebhookResource) + .build(); - @Test - public void testPostJsonNewLine() { - Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{}").getStatus()); - Assert.assertEquals(400, genericWebhookResource.json_new_line(null, "notvalid").getStatus()); - Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{\"valid\":\"true\"}").getStatus()); - Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{\"valid\":\"true\"}\n{\"valid\":\"true\"}\r{\"valid\":\"true\"}").getStatus()); - }; + @Test + public void testPostJson() { + Assert.assertEquals(400, genericWebhookResource.json(null, "{").getStatus()); + Assert.assertEquals(400, genericWebhookResource.json(null, "}").getStatus()); + Assert.assertEquals(400, genericWebhookResource.json(null, "srg").getStatus()); + Assert.assertEquals(400, genericWebhookResource.json(null, "123").getStatus()); + Assert.assertEquals(200, genericWebhookResource.json(null, "{}").getStatus()); + Assert.assertEquals(200, genericWebhookResource.json(null, "{\"valid\":\"true\"}").getStatus()); + } - @Test - public void testPostJsonMeta() throws JsonProcessingException { - Assert.assertEquals(200, genericWebhookResource.json_meta(null, "{}").getStatus()); - Assert.assertEquals(400, genericWebhookResource.json_meta(null, "notvalid").getStatus()); - GenericWebhookData testPostJsonMeta = new GenericWebhookData() - .withHash("test") - .withDeliveredAt(DateTime.now()) - .withCount(1) - .withHashType("type") - .withId("test"); - List<ObjectNode> testPostJsonData = Lists.newArrayList(); - testPostJsonData.add(mapper.createObjectNode().put("valid", "true")); - testPostJsonMeta.setData(testPostJsonData); - String testPostJsonEntity = mapper.writeValueAsString(testPostJsonMeta); - Assert.assertEquals(200, genericWebhookResource.json_meta(null, testPostJsonEntity).getStatus()); + @Test + public void testPostJsonNewLine() { + Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{}").getStatus()); + Assert.assertEquals(400, genericWebhookResource.json_new_line(null, "notvalid").getStatus()); + Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{\"valid\":\"true\"}").getStatus()); + Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{\"valid\":\"true\"}\n{\"valid\":\"true\"}\r{\"valid\":\"true\"}").getStatus()); + } - }; + @Test + public void testPostJsonMeta() throws JsonProcessingException { + Assert.assertEquals(200, genericWebhookResource.json_meta(null, "{}").getStatus()); + Assert.assertEquals(400, genericWebhookResource.json_meta(null, "notvalid").getStatus()); + GenericWebhookData testPostJsonMeta = new GenericWebhookData() + .withHash("test") + .withDeliveredAt(DateTime.now()) + .withCount(1) + .withHashType("type") + .withId("test"); + List<ObjectNode> testPostJsonData = Lists.newArrayList(); + testPostJsonData.add(mapper.createObjectNode().put("valid", "true")); + testPostJsonMeta.setData(testPostJsonData); + String testPostJsonEntity = mapper.writeValueAsString(testPostJsonMeta); + Assert.assertEquals(200, genericWebhookResource.json_meta(null, testPostJsonEntity).getStatus()); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java index 1ab0c2e..788a523 100644 --- a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java +++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java @@ -32,17 +32,17 @@ import java.net.URL; */ public class StreamsApplicationIT { - @Before - public void setupTest() throws Exception { - String[] testArgs = Lists.newArrayList("server", "src/test/resources/configuration.yml").toArray(new String[2]); - TestStreamsApplication.main(testArgs); - } + @Before + public void setupTest() throws Exception { + String[] testArgs = Lists.newArrayList("server", "src/test/resources/configuration.yml").toArray(new String[2]); + TestStreamsApplication.main(testArgs); + } - @Test - public void testApplicationStarted() throws Exception { + @Test + public void testApplicationStarted() throws Exception { - final URL url = new URL("http://localhost:8003/admin/ping"); - final String response = new BufferedReader(new InputStreamReader(url.openStream())).readLine(); - Assert.assertEquals("pong", response); - } + final URL url = new URL("http://localhost:8003/admin/ping"); + final String response = new BufferedReader(new InputStreamReader(url.openStream())).readLine(); + Assert.assertEquals("pong", response); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/InvalidStreamException.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/InvalidStreamException.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/InvalidStreamException.java index 3511b3d..c44a8e4 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/InvalidStreamException.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/InvalidStreamException.java @@ -23,19 +23,19 @@ package org.apache.streams.local.builders; */ public class InvalidStreamException extends RuntimeException { - public InvalidStreamException() { - super(); - } + public InvalidStreamException() { + super(); + } - public InvalidStreamException(String s) { - super(s); - } + public InvalidStreamException(String string) { + super(string); + } - public InvalidStreamException(String s, Throwable throwable) { - super(s, throwable); - } + public InvalidStreamException(String string, Throwable throwable) { + super(string, throwable); + } - public InvalidStreamException(Throwable throwable) { - super(throwable); - } + public InvalidStreamException(Throwable throwable) { + super(throwable); + } }
