http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java index 57f9be5..16565bb 100644 --- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java +++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java @@ -18,32 +18,33 @@ package com.youtube.serializer; -import com.google.api.services.youtube.model.Video; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.api.services.youtube.model.Video; import org.junit.Test; - import static org.junit.Assert.assertEquals; public class YoutubeEventClassifierTest { - private final String testVideo = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}"; - private final String testObjectNode = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#somethingElse\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}"; - @Test - public void testVideoClassification() { - Class klass = YoutubeEventClassifier.detectClass(testVideo); - - assertEquals(klass, Video.class); - } - - @Test(expected=IllegalArgumentException.class) - public void testExceptionClassification() { - YoutubeEventClassifier.detectClass(""); - } - - @Test - public void testObjectNodeClassification() { - Class klass = YoutubeEventClassifier.detectClass(testObjectNode); - - assertEquals(klass, ObjectNode.class); - } + + private final String testVideo = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}"; + private final String testObjectNode = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#somethingElse\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}"; + + @Test + public void testVideoClassification() { + Class klass = YoutubeEventClassifier.detectClass(testVideo); + + assertEquals(klass, Video.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testExceptionClassification() { + YoutubeEventClassifier.detectClass(""); + } + + @Test + public void testObjectNodeClassification() { + Class klass = YoutubeEventClassifier.detectClass(testObjectNode); + + assertEquals(klass, ObjectNode.class); + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java index c162f41..29afd19 100644 --- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java +++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java @@ -15,17 +15,19 @@ * specific language governing permissions and limitations * under the License. */ + package com.youtube.serializer; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.api.services.youtube.model.Video; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Provider; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.api.services.youtube.model.Video; import org.joda.time.DateTime; import org.junit.Before; import org.junit.Test; @@ -38,75 +40,84 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +/** + * Test for YoutubeVideoSerDe. + */ public class YoutubeVideoSerDeTest { - private final static Logger LOGGER = LoggerFactory.getLogger(YoutubeVideoSerDeTest.class); - private final String testVideo = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}"; - private ObjectMapper objectMapper; - private YoutubeActivityUtil youtubeActivityUtil; - - @Before - public void setup() { - objectMapper = StreamsJacksonMapper.getInstance(); - SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer()); - objectMapper.registerModule(simpleModule); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - youtubeActivityUtil = new YoutubeActivityUtil(); - } - @Test - public void testVideoObject() { - LOGGER.info("raw: {}", testVideo); + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeVideoSerDeTest.class); + private final String testVideo = "{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart failure, treatment options, and lifestyle changes. Learn more: https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be Heart Smart: Congestive Heart Failure LIVE Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}"; + private ObjectMapper objectMapper; + private YoutubeActivityUtil youtubeActivityUtil; - try { - Activity activity = new Activity(); + /** + * setup for test. + */ + @Before + public void setup() { + objectMapper = StreamsJacksonMapper.getInstance(); + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer()); + objectMapper.registerModule(simpleModule); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - Video video = objectMapper.readValue(testVideo, Video.class); + youtubeActivityUtil = new YoutubeActivityUtil(); + } - youtubeActivityUtil.updateActivity(video, activity, "testChannelId"); - LOGGER.info("activity: {}", activity); + @Test + public void testVideoObject() { + LOGGER.info("raw: {}", testVideo); - assertNotNull(activity); - assert (activity.getId().contains("id:youtube:post")); - assertEquals(activity.getVerb(), "post"); + try { + Activity activity = new Activity(); - Provider provider = activity.getProvider(); - assertEquals(provider.getId(), "id:providers:youtube"); - assertEquals(provider.getDisplayName(), "YouTube"); + Video video = objectMapper.readValue(testVideo, Video.class); - ActivityObject actor = activity.getActor(); - assert (actor.getId().contains("id:youtube:")); - assertNotNull(actor.getDisplayName()); - assertNotNull(actor.getSummary()); + youtubeActivityUtil.updateActivity(video, activity, "testChannelId"); + LOGGER.info("activity: {}", activity); - assertNotNull(activity.getTitle()); - assertNotNull(activity.getUrl()); - assertNotNull(activity.getContent()); + assertNotNull(activity); + assert (activity.getId().contains("id:youtube:post")); + assertEquals(activity.getVerb(), "post"); - assertEquals(activity.getPublished().getClass(), DateTime.class); + Provider provider = activity.getProvider(); + assertEquals(provider.getId(), "id:providers:youtube"); + assertEquals(provider.getDisplayName(), "YouTube"); - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + ActivityObject actor = activity.getActor(); + assert (actor.getId().contains("id:youtube:")); + assertNotNull(actor.getDisplayName()); + assertNotNull(actor.getSummary()); - assertNotNull(extensions.get("youtube")); - assertNotNull(extensions.get("likes")); + assertNotNull(activity.getTitle()); + assertNotNull(activity.getUrl()); + assertNotNull(activity.getContent()); - assertTrue(testActivityObject(activity)); - } catch (Exception e) { - LOGGER.error("Exception while testing the Ser/De functionality of the Video deserializer: {}", e); - } - } + assertEquals(activity.getPublished().getClass(), DateTime.class); - private boolean testActivityObject(Activity activity) { - boolean valid = false; + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - ActivityObject obj = activity.getObject(); + assertNotNull(extensions.get("youtube")); + assertNotNull(extensions.get("likes")); - if(obj.getObjectType().equals("video") && !obj.getImage().equals(null) && - !obj.getUrl().equals("null") && obj.getUrl().contains("https://www.youtube.com/watch?v=")) { - valid = true; - } + assertTrue(testActivityObject(activity)); + } catch (Exception ex) { + LOGGER.error("Exception while testing the Ser/De functionality of the Video deserializer: {}", ex); + } + } + + private boolean testActivityObject(Activity activity) { + boolean valid = false; - return valid; + ActivityObject obj = activity.getObject(); + + if ( obj.getObjectType().equals("video") + && !obj.getImage().equals(null) + && !obj.getUrl().equals("null") + && obj.getUrl().contains("https://www.youtube.com/watch?v=")) { + valid = true; } + + return valid; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java index 1f53df8..2e143de 100644 --- a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java +++ b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java @@ -28,43 +28,43 @@ import java.io.FileReader; import java.io.LineNumberReader; /** - * Created by sblackmon on 10/13/16. + * YoutubeChannelProviderIT integration test for YoutubeChannelProvider. */ public class YoutubeChannelProviderIT { - private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeChannelProviderIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeChannelProviderIT.class); - @Test - public void testYoutubeChannelProvider() throws Exception { + @Test + public void testYoutubeChannelProvider() throws Exception { - String configfile = "./target/test-classes/YoutubeChannelProviderIT.conf"; - String outfile = "./target/test-classes/YoutubeChannelProviderIT.stdout.txt"; + String configfile = "./target/test-classes/YoutubeChannelProviderIT.conf"; + String outfile = "./target/test-classes/YoutubeChannelProviderIT.stdout.txt"; - String[] args = new String[2]; - args[0] = configfile; - args[1] = outfile; + String[] args = new String[2]; + args[0] = configfile; + args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { - try { - YoutubeChannelProvider.main(args); - } catch( Exception e ) { - LOGGER.error("Test Exception!", e); - } - }); - testThread.start(); - testThread.join(60000); + Thread testThread = new Thread((Runnable) () -> { + try { + YoutubeChannelProvider.main(args); + } catch ( Exception ex ) { + LOGGER.error("Test Exception!", ex); + } + }); + testThread.start(); + testThread.join(60000); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() >= 1); + assert (outCounter.getLineNumber() >= 1); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java index a8b92cf..dd0eaab 100644 --- a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java +++ b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java @@ -28,43 +28,43 @@ import java.io.FileReader; import java.io.LineNumberReader; /** - * Created by sblackmon on 10/13/16. + * Integration Test for YoutubeUserActivityProvider. */ public class YoutubeUserActivityProviderIT { - private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeUserActivityProviderIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeUserActivityProviderIT.class); - @Test - public void testYoutubeUserActivityProvider() throws Exception { + @Test + public void testYoutubeUserActivityProvider() throws Exception { - String configfile = "./target/test-classes/YoutubeUserActivityProviderIT.conf"; - String outfile = "./target/test-classes/YoutubeUserActivityProviderIT.stdout.txt"; + String configfile = "./target/test-classes/YoutubeUserActivityProviderIT.conf"; + String outfile = "./target/test-classes/YoutubeUserActivityProviderIT.stdout.txt"; - String[] args = new String[2]; - args[0] = configfile; - args[1] = outfile; + String[] args = new String[2]; + args[0] = configfile; + args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { - try { - YoutubeUserActivityProvider.main(args); - } catch( Exception e ) { - LOGGER.error("Test Exception!", e); - } - }); - testThread.start(); - testThread.join(60000); + Thread testThread = new Thread((Runnable) () -> { + try { + YoutubeUserActivityProvider.main(args); + } catch ( Exception ex ) { + LOGGER.error("Test Exception!", ex); + } + }); + testThread.start(); + testThread.join(60000); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() >= 250); + assert (outCounter.getLineNumber() >= 250); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/DatumStatus.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatus.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatus.java index b85b2d0..5846665 100644 --- a/streams-core/src/main/java/org/apache/streams/core/DatumStatus.java +++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatus.java @@ -18,8 +18,11 @@ package org.apache.streams.core; +/** + * Status of StreamsDatum. + */ public enum DatumStatus { - SUCCESS, - PARTIAL, - FAIL + SUCCESS, + PARTIAL, + FAIL } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java index 58518b9..c00e378 100644 --- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java +++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java @@ -24,7 +24,7 @@ package org.apache.streams.core; @Deprecated public interface DatumStatusCountable { - @Deprecated - public DatumStatusCounter getDatumStatusCounter(); + @Deprecated + public DatumStatusCounter getDatumStatusCounter(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java index 2e598ce..250cc7e 100644 --- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java +++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java @@ -21,72 +21,115 @@ package org.apache.streams.core; import java.io.Serializable; @Deprecated -public class DatumStatusCounter implements Serializable -{ - private volatile int attempted = 0; - private volatile int success = 0; - private volatile int fail = 0; - private volatile int partial = 0; - private volatile int emitted = 0; - - public int getAttempted() { return this.attempted; } - public int getSuccess() { return this.success; } - public int getFail() { return this.fail; } - public int getPartial() { return this.partial; } - public int getEmitted() { return this.emitted; } - - public DatumStatusCounter() { - } +public class DatumStatusCounter implements Serializable { - @Deprecated - public void add(DatumStatusCounter datumStatusCounter) { - this.attempted += datumStatusCounter.getAttempted(); - this.success += datumStatusCounter.getSuccess(); - this.partial = datumStatusCounter.getPartial(); - this.fail += datumStatusCounter.getFail(); - this.emitted += datumStatusCounter.getEmitted(); - } + private volatile int attempted = 0; + private volatile int success = 0; + private volatile int fail = 0; + private volatile int partial = 0; + private volatile int emitted = 0; - @Deprecated - public void incrementAttempt() { - this.attempted += 1; - } + public int getAttempted() { + return this.attempted; + } - @Deprecated - public void incrementAttempt(int counter) { - this.attempted += counter; - } + public int getSuccess() { + return this.success; + } - @Deprecated - public synchronized void incrementStatus(DatumStatus workStatus) { - // add this to the record counter - switch(workStatus) { - case SUCCESS: this.success++; break; - case PARTIAL: this.partial++; break; - case FAIL: this.fail++; break; - } - this.emitted += 1; - } + public int getFail() { + return this.fail; + } + + public int getPartial() { + return this.partial; + } - @Deprecated - public synchronized void incrementStatus(DatumStatus workStatus, int counter) { - // add this to the record counter - switch(workStatus) { - case SUCCESS: this.success += counter; break; - case PARTIAL: this.partial += counter; break; - case FAIL: this.fail += counter; break; - } - this.emitted += counter; + public int getEmitted() { + return this.emitted; + } + + public DatumStatusCounter() { + } + + /** + * accumulate partial DatumStatusCounter. + * @param datumStatusCounter DatumStatusCounter + */ + @Deprecated + public void add(DatumStatusCounter datumStatusCounter) { + this.attempted += datumStatusCounter.getAttempted(); + this.success += datumStatusCounter.getSuccess(); + this.partial = datumStatusCounter.getPartial(); + this.fail += datumStatusCounter.getFail(); + this.emitted += datumStatusCounter.getEmitted(); + } + + @Deprecated + public void incrementAttempt() { + this.attempted += 1; + } + + @Deprecated + public void incrementAttempt(int counter) { + this.attempted += counter; + } + + /** + * increment specific DatumStatus by 1. + * @param workStatus DatumStatus + */ + @Deprecated + public synchronized void incrementStatus(DatumStatus workStatus) { + // add this to the record counter + switch (workStatus) { + case SUCCESS: + this.success++; + break; + case PARTIAL: + this.partial++; + break; + case FAIL: + this.fail++; + break; + default: + break; } + this.emitted += 1; + } - @Override - public String toString() { - return "DatumStatusCounter{" + - "attempted=" + attempted + - ", success=" + success + - ", fail=" + fail + - ", partial=" + partial + - ", emitted=" + emitted + - '}'; + /** + * increment specific DatumStatus by count. + * @param workStatus DatumStatus + * @param counter counter + */ + @Deprecated + public synchronized void incrementStatus(DatumStatus workStatus, int counter) { + // add this to the record counter + switch (workStatus) { + case SUCCESS: + this.success += counter; + break; + case PARTIAL: + this.partial += counter; + break; + case FAIL: + this.fail += counter; + break; + default: + break; } + this.emitted += counter; + } + + @Override + public String toString() { + return "DatumStatusCounter{" + + "attempted=" + attempted + + ", success=" + success + + ", fail=" + fail + + ", partial=" + partial + + ", emitted=" + emitted + + '}'; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java index 39bc937..1052647 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java @@ -19,6 +19,7 @@ package org.apache.streams.core; import org.apache.streams.config.StreamsConfiguration; + import org.joda.time.DateTime; import java.io.Serializable; @@ -39,82 +40,82 @@ import java.math.BigInteger; */ public interface StreamBuilder extends Serializable { - public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration); - - public StreamsConfiguration getStreamsConfiguration(); - - /** - * Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream. - * @param processorId unique id for this processor - must be unique across the entire stream - * @param processor the processor to execute - * @param numTasks the number of instances of this processor to run concurrently - * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will - * receive data from. - * @return this - */ - public StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds); - - /** - * Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data processing stream. - * @param persistWriterId unique id for this processor - must be unique across the entire stream - * @param writer the writer to execute - * @param numTasks the number of instances of this writer to run concurrently - * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will - * receive data from. - * @return this - */ - public StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds); - - /** - * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute - * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce data. - * @param streamId unique if for this provider - must be unique across the entire stream. - * @param provider provider to execute - * @return this - */ - public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider); - - /** - * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute - * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce data. - * @param streamId unique if for this provider - must be unique across the entire stream. - * @param provider provider to execute - * @return this - */ - public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider); - - /** - * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute - * {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} to produce data. - * @param streamId unique if for this provider - must be unique across the entire stream. - * @param provider provider to execute - * @param sequence sequence to pass to {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method - * @return this - */ - public StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence); - - /** - * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute - * {@link org.apache.streams.core.StreamsProvider:readRange(DateTime, DateTime)} to produce data. Whether the start - * and end dates are inclusive or exclusive is up to the implementation. - * @param streamId unique if for this provider - must be unique across the entire stream. - * @param provider provider to execute - * @param start start date - * @param end end date - * @return this - */ - public StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end); - - /** - * Builds the stream, and starts it or submits it based on implementation. - */ - public void start(); - - /** - * Stops the streams processing. No guarantee on a smooth shutdown. Optional method, may not be implemented in - * all cases. - */ - public void stop(); + public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration); + + public StreamsConfiguration getStreamsConfiguration(); + + /** + * Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream. + * @param processorId unique id for this processor - must be unique across the entire stream + * @param processor the processor to execute + * @param numTasks the number of instances of this processor to run concurrently + * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will + * receive data from. + * @return this + */ + public StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds); + + /** + * Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data processing stream. + * @param persistWriterId unique id for this processor - must be unique across the entire stream + * @param writer the writer to execute + * @param numTasks the number of instances of this writer to run concurrently + * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will + * receive data from. + * @return this + */ + public StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds); + + /** + * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute + * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce data. + * @param streamId unique if for this provider - must be unique across the entire stream. + * @param provider provider to execute + * @return this + */ + public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider); + + /** + * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute + * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce data. + * @param streamId unique if for this provider - must be unique across the entire stream. + * @param provider provider to execute + * @return this + */ + public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider); + + /** + * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute + * {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} to produce data. + * @param streamId unique if for this provider - must be unique across the entire stream. + * @param provider provider to execute + * @param sequence sequence to pass to {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method + * @return this + */ + public StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence); + + /** + * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute + * {@link org.apache.streams.core.StreamsProvider:readRange(DateTime, DateTime)} to produce data. Whether the start + * and end dates are inclusive or exclusive is up to the implementation. + * @param streamId unique if for this provider - must be unique across the entire stream. + * @param provider provider to execute + * @param start start date + * @param end end date + * @return this + */ + public StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end); + + /** + * Builds the stream, and starts it or submits it based on implementation. + */ + public void start(); + + /** + * Stops the streams processing. No guarantee on a smooth shutdown. Optional method, may not be implemented in + * all cases. + */ + public void stop(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java b/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java deleted file mode 100644 index ca2e7ef..0000000 --- a/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.core; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by sblackmon on 1/6/14. - */ -public class StreamHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(StreamHandler.class); - - private volatile StreamState state; - - public void setState(StreamState state) { - this.state = state; - } - - public StreamState getState() { - return this.state; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamState.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamState.java b/streams-core/src/main/java/org/apache/streams/core/StreamState.java deleted file mode 100644 index 0c24d29..0000000 --- a/streams-core/src/main/java/org/apache/streams/core/StreamState.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.core; - -public enum StreamState { - RUNNING, //Stream is currently connected and running - STOPPED, // Stream has been shut down and is stopped - CONNECTING, //Stream is attempting to connect to server - SHUTTING_DOWN, //Stream has initialized shutdown - DISCONNECTED //Stream has unintentionally lost connection -} - http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java index 8367631..27f3d85 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java @@ -19,6 +19,7 @@ package org.apache.streams.core; import org.apache.streams.pojo.json.Activity; + import org.joda.time.DateTime; import java.io.Serializable; @@ -28,142 +29,139 @@ import java.util.Map; public class StreamsDatum implements Serializable { - public StreamsDatum(Object document) { - this(document, null, null, null, new HashMap<>()); - } - - public StreamsDatum(Object document, String id) { - this(document, id, null, null, new HashMap<>()); - } - - public StreamsDatum(Object document, BigInteger sequenceid) { - this(document, null, null, sequenceid); - } - - public StreamsDatum(Object document, DateTime timestamp) { - this(document, null, timestamp, null); - } - - public StreamsDatum(Object document, DateTime timestamp, BigInteger sequenceid) { - this(document, null, timestamp, sequenceid); - } - - public StreamsDatum(Object document, DateTime timestamp, Map<String, Object> metadata) { - this(document, null, timestamp, null, metadata); - } - - public StreamsDatum(Object document, String id, DateTime timestamp) { - this(document, id, timestamp, null, new HashMap<>()); - } - - public StreamsDatum(Object document, String id, Map<String, Object> metadata) { - this(document, id, null, null, metadata); - } - - public StreamsDatum(Object document, String id, BigInteger sequenceid, Map<String, Object> metadata) { - this(document, id, null, sequenceid, metadata); - } - - public StreamsDatum(Object document, String id, BigInteger sequenceid) { - this(document, id, sequenceid, new HashMap<>()); - } - - public StreamsDatum(Object document, String id, DateTime timestamp, BigInteger sequenceid) { - this.document = document; - this.id = id; - this.timestamp = timestamp; - this.sequenceid = sequenceid; - this.metadata = new HashMap<>(); - } - - public StreamsDatum(Object document, String id, DateTime timestamp, BigInteger sequenceid, Map<String, Object> metadata) { - this.document = document; - this.id = id; - this.timestamp = timestamp; - this.sequenceid = sequenceid; - this.metadata = metadata; - } - - public DateTime timestamp; + public StreamsDatum(Object document) { + this(document, null, null, null, new HashMap<>()); + } - public BigInteger sequenceid; + public StreamsDatum(Object document, String id) { + this(document, id, null, null, new HashMap<>()); + } - public Map<String, Object> metadata; + public StreamsDatum(Object document, BigInteger sequenceid) { + this(document, null, null, sequenceid); + } - public Object document; + public StreamsDatum(Object document, DateTime timestamp) { + this(document, null, timestamp, null); + } - private String id; + public StreamsDatum(Object document, DateTime timestamp, BigInteger sequenceid) { + this(document, null, timestamp, sequenceid); + } + + public StreamsDatum(Object document, DateTime timestamp, Map<String, Object> metadata) { + this(document, null, timestamp, null, metadata); + } - public DateTime getTimestamp() { - return timestamp; - } - - public void setTimestamp(DateTime timestamp) { - this.timestamp = timestamp; - } - - public BigInteger getSequenceid() { - return sequenceid; - } - - public void setSequenceid(BigInteger sequenceid) { - this.sequenceid = sequenceid; - } - - public Map<String, Object> getMetadata() { - return metadata; - } - - public void setMetadata(Map<String, Object> metadata) { - this.metadata = metadata; - } - - public Object getDocument() { - return document; - } - - public void setDocument(Object document) { - this.document = document; - } - - - public String getId(){ - if(this.id == null && this.document instanceof Activity) { - return ((Activity)this.document).getId(); - } - return id; - } - - public void setId(String id) { - this.id = id; - } - - @Override - public boolean equals(Object o) { - if(o instanceof StreamsDatum) { - StreamsDatum that = (StreamsDatum) o; - if(this.document != null && this.document.equals(that.document)) { - return (this.timestamp != null ? this.timestamp.equals(that.timestamp) : that.timestamp == null) && - (this.sequenceid != null ? this.sequenceid.equals(that.sequenceid) : that.sequenceid == null); - } - else { - return that.document == null && this.document == null; - } - } - else { - return false; - } - } - - @Override - public String toString() { - return "StreamsDatum{" + - "timestamp=" + timestamp + - ", sequenceid=" + sequenceid + - ", metadata=" + metadata + - ", document=" + document + - ", id='" + id + '\'' + - '}'; - } + public StreamsDatum(Object document, String id, DateTime timestamp) { + this(document, id, timestamp, null, new HashMap<>()); + } + + public StreamsDatum(Object document, String id, Map<String, Object> metadata) { + this(document, id, null, null, metadata); + } + + public StreamsDatum(Object document, String id, BigInteger sequenceid, Map<String, Object> metadata) { + this(document, id, null, sequenceid, metadata); + } + + public StreamsDatum(Object document, String id, BigInteger sequenceid) { + this(document, id, sequenceid, new HashMap<>()); + } + + public StreamsDatum(Object document, String id, DateTime timestamp, BigInteger sequenceid) { + this.document = document; + this.id = id; + this.timestamp = timestamp; + this.sequenceid = sequenceid; + this.metadata = new HashMap<>(); + } + + public StreamsDatum(Object document, String id, DateTime timestamp, BigInteger sequenceid, Map<String, Object> metadata) { + this.document = document; + this.id = id; + this.timestamp = timestamp; + this.sequenceid = sequenceid; + this.metadata = metadata; + } + + public DateTime timestamp; + + public BigInteger sequenceid; + + public Map<String, Object> metadata; + + public Object document; + + private String id; + + public DateTime getTimestamp() { + return timestamp; + } + + public void setTimestamp(DateTime timestamp) { + this.timestamp = timestamp; + } + + public BigInteger getSequenceid() { + return sequenceid; + } + + public void setSequenceid(BigInteger sequenceid) { + this.sequenceid = sequenceid; + } + + public Map<String, Object> getMetadata() { + return metadata; + } + + public void setMetadata(Map<String, Object> metadata) { + this.metadata = metadata; + } + + public Object getDocument() { + return document; + } + + public void setDocument(Object document) { + this.document = document; + } + + public String getId() { + if (this.id == null && this.document instanceof Activity) { + return ((Activity)this.document).getId(); + } + return id; + } + + public void setId(String id) { + this.id = id; + } + + @Override + public boolean equals(Object configurationObject) { + if (configurationObject instanceof StreamsDatum) { + StreamsDatum that = (StreamsDatum) configurationObject; + if (this.document != null && this.document.equals(that.document)) { + return (this.timestamp != null ? this.timestamp.equals(that.timestamp) : that.timestamp == null) + && (this.sequenceid != null ? this.sequenceid.equals(that.sequenceid) : that.sequenceid == null); + } else { + return that.document == null && this.document == null; + } + } else { + return false; + } + } + + @Override + public String toString() { + return "StreamsDatum{" + + "timestamp=" + timestamp + + ", sequenceid=" + sequenceid + + ", metadata=" + metadata + + ", document=" + document + + ", id='" + id + '\'' + + '}'; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java b/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java deleted file mode 100644 index 929b26f..0000000 --- a/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.core; - -import java.util.Queue; - -public interface StreamsFilter { - - void start(); - void stop(); - - public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue); - public Queue<StreamsDatum> getProcessorInputQueue(); - - public void setProcessorOutputQueue(Queue<StreamsDatum> outputQueue); - public Queue<StreamsDatum> getProcessorOutputQueue(); - - public boolean filter(StreamsDatum entry); - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java index f5422a5..490f454 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java @@ -21,26 +21,26 @@ package org.apache.streams.core; import java.io.Serializable; /** - * + * Each step in a pipeline is a StreamsOperation. */ public interface StreamsOperation extends Serializable { - /** - * Each operation must publish an identifier. - */ - public String getId(); + /** + * Each operation must publish an identifier. + */ + public String getId(); - /** - * This method will be called after initialization/serialization. Initialize any non-serializable objects here. - * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type - * will be based on where the operation is being run (ie. hadoop, storm, locally, etc.) - */ - public void prepare(Object configurationObject); + /** + * This method will be called after initialization/serialization. Initialize any non-serializable objects here. + * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type + * will be based on where the operation is being run (ie. hadoop, storm, locally, etc.) + */ + public void prepare(Object configurationObject); - /** - * No guarantee that this method will ever be called. But upon shutdown of the stream, an attempt to call this method - * will be made. - * Use this method to terminate connections, etc. - */ - public void cleanUp(); + /** + * No guarantee that this method will ever be called. But upon shutdown of the stream, an attempt to call this method + * will be made. + * Use this method to terminate connections, etc. + */ + public void cleanUp(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java b/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java index 1a6b0d8..64063ac 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java @@ -23,16 +23,14 @@ import org.joda.time.DateTime; import java.math.BigInteger; /** - * * Currently a duplicate interface. Has exact same methods as StreamsProvider. - * Future work should make this interface necessary I'm told. - * */ public interface StreamsPersistReader extends StreamsProvider { - StreamsResultSet readAll(); - StreamsResultSet readNew(BigInteger sequence); - StreamsResultSet readRange(DateTime start, DateTime end); + StreamsResultSet readAll(); + + StreamsResultSet readNew(BigInteger sequence); + StreamsResultSet readRange(DateTime start, DateTime end); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java b/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java index 1af62e7..59797e4 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java @@ -18,12 +18,15 @@ package org.apache.streams.core; -public interface StreamsPersistWriter extends StreamsOperation{ +/** + * StreamsOperation for writing data out of a pipeline. + */ +public interface StreamsPersistWriter extends StreamsOperation { - /** - * Persist the StreamsDatum to the corresponding data store. - * @param entry to be stored. - */ - void write( StreamsDatum entry ); + /** + * Persist the StreamsDatum to the corresponding data store. + * @param entry to be stored. + */ + void write( StreamsDatum entry ); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java b/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java index b63e2d9..25e1a07 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java @@ -20,14 +20,14 @@ package org.apache.streams.core; import java.util.List; -public interface StreamsProcessor extends StreamsOperation{ +public interface StreamsProcessor extends StreamsOperation { - /** - * Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and return the the StreamsDatums that will - * passed to every down stream operation that reads from this processor. - * @param entry StreamsDatum to be process - * @return resulting StreamDatums from process. Should never be null or contain null object. Empty list OK. - */ - List<StreamsDatum> process( StreamsDatum entry ); + /** + * Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and return the the StreamsDatums that will + * passed to every down stream operation that reads from this processor. + * @param entry StreamsDatum to be processed + * @return resulting StreamDatums from processing. Should never be null or contain null object. Empty list OK. + */ + List<StreamsDatum> process( StreamsDatum entry ); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java b/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java index 7c27e34..2547343 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java @@ -18,45 +18,46 @@ package org.apache.streams.core; -import java.math.BigInteger; import org.joda.time.DateTime; +import java.math.BigInteger; + /** * A StreamsProvider represents the entry point into the Streams pipeline. Providers are responsible for inserting * data into the pipeline in discrete result sets. */ public interface StreamsProvider extends StreamsOperation { - /** - * Start the operation of the stream - */ - void startStream(); - - /** - * Read the current items available from the provider - * @return a non-null {@link org.apache.streams.core.StreamsResultSet} - */ - StreamsResultSet readCurrent(); - - /** - * TODO: Define how this operates or eliminate - * @param sequence - * @return {@link StreamsResultSet} - */ - StreamsResultSet readNew(BigInteger sequence); - - /** - * TODO: Define how this operates or eliminate - * @param start - * @param end - * @return {@link StreamsResultSet} - */ - StreamsResultSet readRange(DateTime start, DateTime end); - - /** - * Flag to indicate whether the provider is producing data - * @return true if the processor is actively awaiting or producing data. False otherwise. - */ - boolean isRunning(); + /** + * Start the operation of the stream. + */ + void startStream(); + + /** + * Read the current items available from the provider. + * @return a non-null {@link org.apache.streams.core.StreamsResultSet} + */ + StreamsResultSet readCurrent(); + + /** + * Read data with sequenceId greater than sequence. + * @param sequence BigInteger sequence + * @return {@link StreamsResultSet} + */ + StreamsResultSet readNew(BigInteger sequence); + + /** + * Read data with event time between start DateTime and end DateTime. + * @param start start DateTime + * @param end end DateTime + * @return {@link StreamsResultSet} + */ + StreamsResultSet readRange(DateTime start, DateTime end); + + /** + * Flag to indicate whether the provider is still producing data. + * @return true if the processor is actively awaiting or producing data. False otherwise. + */ + boolean isRunning(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java b/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java index 1e1cbe2..d64f27e 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java @@ -21,40 +21,43 @@ package org.apache.streams.core; import java.util.Iterator; import java.util.Queue; +/** + * StreamsResultSet is a wrapper for an Iterator around a set of StreamsDatum. + */ public class StreamsResultSet implements Iterable<StreamsDatum> { - Queue<StreamsDatum> queue; + Queue<StreamsDatum> queue; - DatumStatusCounter counter; + DatumStatusCounter counter; - public StreamsResultSet(Queue<StreamsDatum> queue) { - this.queue = queue; - } + public StreamsResultSet(Queue<StreamsDatum> queue) { + this.queue = queue; + } - @Override - public Iterator<StreamsDatum> iterator() { - return queue.iterator(); - } + @Override + public Iterator<StreamsDatum> iterator() { + return queue.iterator(); + } - public int size() { - return queue.size(); - } + public int size() { + return queue.size(); + } - public Queue<StreamsDatum> getQueue() { - return queue; - } + public Queue<StreamsDatum> getQueue() { + return queue; + } - public void setQueue(Queue<StreamsDatum> queue) { - this.queue = queue; - } + public void setQueue(Queue<StreamsDatum> queue) { + this.queue = queue; + } - public DatumStatusCounter getCounter() { - return counter; - } + public DatumStatusCounter getCounter() { + return counter; + } - public void setCounter(DatumStatusCounter counter) { - this.counter = counter; - } + public void setCounter(DatumStatusCounter counter) { + this.counter = counter; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java b/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java index a229413..7d80098 100644 --- a/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java +++ b/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java @@ -21,39 +21,40 @@ package org.apache.streams.core.util; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsOperation; + import org.joda.time.DateTime; import java.util.HashMap; import java.util.Map; /** - * Provides common utilities for managing and manipulating StreamsDatums + * Provides common utilities for managing and manipulating StreamsDatums. */ public class DatumUtils { - /** - * Adds an error occurred during a StreamsOperation step to the StreamsDatum's metadata. By convention, errors are - * placed in the metadata under the "errors" and are organized by class simple name where the failure occurred. - * - * @param datum the datum on which the operation step errored - * @param e the error encountered - * @param operationClass the class of the operation - */ - @SuppressWarnings("all") - public static void addErrorToMetadata(StreamsDatum datum, Throwable e, Class<? extends StreamsOperation> operationClass) { - if(!datum.getMetadata().containsKey("errors")) { - datum.getMetadata().put("errors", new HashMap<String, Throwable>()); - } - Map<String, Throwable> errors = (Map)datum.getMetadata().get("errors"); - errors.put(operationClass.getCanonicalName(), e); + /** + * Adds an error occurred during a StreamsOperation step to the StreamsDatum's metadata. By convention, errors are + * placed in the metadata under the "errors" and are organized by class simple name where the failure occurred. + * + * @param datum the datum on which the operation step errored + * @param throwable the throwable encountered + * @param operationClass the class of the operation + */ + @SuppressWarnings("all") + public static void addErrorToMetadata(StreamsDatum datum, Throwable throwable, Class<? extends StreamsOperation> operationClass) { + if (!datum.getMetadata().containsKey("errors")) { + datum.getMetadata().put("errors", new HashMap<String, Throwable>()); } + Map<String, Throwable> errors = (Map)datum.getMetadata().get("errors"); + errors.put(operationClass.getCanonicalName(), throwable); + } - public static StreamsDatum cloneDatum(StreamsDatum datum) { - StreamsDatum clone = new StreamsDatum(datum.getDocument()); - clone.setId(datum.getId() == null ? null : datum.getId()); - clone.setTimestamp(datum.getTimestamp() == null ? null : new DateTime(datum.getTimestamp())); - clone.setSequenceid(datum.getSequenceid() == null ? null : datum.getSequenceid()); - clone.setMetadata(datum.getMetadata() == null ? null : new HashMap<>(datum.getMetadata())); - return clone; - } + public static StreamsDatum cloneDatum(StreamsDatum datum) { + StreamsDatum clone = new StreamsDatum(datum.getDocument()); + clone.setId(datum.getId() == null ? null : datum.getId()); + clone.setTimestamp(datum.getTimestamp() == null ? null : new DateTime(datum.getTimestamp())); + clone.setSequenceid(datum.getSequenceid() == null ? null : datum.getSequenceid()); + clone.setMetadata(datum.getMetadata() == null ? null : new HashMap<>(datum.getMetadata())); + return clone; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java index 8bfa28b..f13a44f 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java +++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java @@ -15,62 +15,67 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.jackson; +import org.apache.streams.pojo.json.DatumStatusCounterBroadcast; + import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonNode; -import org.apache.streams.pojo.json.DatumStatusCounterBroadcast; import org.slf4j.Logger; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Arrays; import javax.management.MBeanAttributeInfo; import javax.management.MBeanInfo; import javax.management.MBeanServer; import javax.management.ObjectName; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.Arrays; public class DatumStatusCounterDeserializer extends JsonDeserializer<DatumStatusCounterBroadcast> { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DatumStatusCounterDeserializer.class); - public DatumStatusCounterDeserializer() { + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DatumStatusCounterDeserializer.class); - } + public DatumStatusCounterDeserializer() { - @Override - public DatumStatusCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - try { - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + } - DatumStatusCounterBroadcast datumStatusCounterBroadcast = new DatumStatusCounterBroadcast(); - JsonNode attributes = jsonParser.getCodec().readTree(jsonParser); + @Override + public DatumStatusCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + try { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - ObjectName name = new ObjectName(attributes.get("canonicalName").asText()); - MBeanInfo info = server.getMBeanInfo(name); - datumStatusCounterBroadcast.setName(name.toString()); + DatumStatusCounterBroadcast datumStatusCounterBroadcast = new DatumStatusCounterBroadcast(); + JsonNode attributes = jsonParser.getCodec().readTree(jsonParser); - for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) { - try { - switch(attribute.getName()) { - case "Failed": - datumStatusCounterBroadcast.setFailed((boolean) server.getAttribute(name, attribute.getName())); - break; - case "Passed": - datumStatusCounterBroadcast.setPassed((boolean) server.getAttribute(name, attribute.getName())); - break; - } - } catch (Exception e) { - LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", e); - } - } + ObjectName name = new ObjectName(attributes.get("canonicalName").asText()); + MBeanInfo info = server.getMBeanInfo(name); + datumStatusCounterBroadcast.setName(name.toString()); - return datumStatusCounterBroadcast; - } catch (Exception e) { - LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", e); - return null; + for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) { + try { + switch (attribute.getName()) { + case "Failed": + datumStatusCounterBroadcast.setFailed((boolean) server.getAttribute(name, attribute.getName())); + break; + case "Passed": + datumStatusCounterBroadcast.setPassed((boolean) server.getAttribute(name, attribute.getName())); + break; + default: + break; + } + } catch (Exception ex) { + LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", ex); } + } + + return datumStatusCounterBroadcast; + } catch (Exception ex) { + LOGGER.error("Exception trying to deserialize DatumStatusCounterBroadcast object: {}", ex); + return null; } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java index 43c9239..8259340 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java +++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java @@ -15,65 +15,70 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.jackson; +import org.apache.streams.pojo.json.MemoryUsageBroadcast; + import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonNode; -import org.apache.streams.pojo.json.MemoryUsageBroadcast; import org.slf4j.Logger; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Arrays; import javax.management.MBeanAttributeInfo; import javax.management.MBeanInfo; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.CompositeDataSupport; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.Arrays; public class MemoryUsageDeserializer extends JsonDeserializer<MemoryUsageBroadcast> { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MemoryUsageDeserializer.class); - public MemoryUsageDeserializer() { + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MemoryUsageDeserializer.class); - } + public MemoryUsageDeserializer() { - @Override - public MemoryUsageBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - try { - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + } - MemoryUsageBroadcast memoryUsageBroadcast = new MemoryUsageBroadcast(); - JsonNode attributes = jsonParser.getCodec().readTree(jsonParser); + @Override + public MemoryUsageBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + try { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - ObjectName name = new ObjectName(attributes.get("canonicalName").asText()); - MBeanInfo info = server.getMBeanInfo(name); - memoryUsageBroadcast.setName(name.toString()); + MemoryUsageBroadcast memoryUsageBroadcast = new MemoryUsageBroadcast(); + JsonNode attributes = jsonParser.getCodec().readTree(jsonParser); - for(MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) { - switch(attribute.getName()) { - case "Verbose": - memoryUsageBroadcast.setVerbose((boolean) server.getAttribute(name, attribute.getName())); - break; - case "ObjectPendingFinalizationCount": - memoryUsageBroadcast.setObjectPendingFinalizationCount(Long.parseLong(server.getAttribute(name, attribute.getName()).toString())); - break; - case "HeapMemoryUsage": - memoryUsageBroadcast.setHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used")); - break; - case "NonHeapMemoryUsage": - memoryUsageBroadcast.setNonHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used")); - break; - } - } + ObjectName name = new ObjectName(attributes.get("canonicalName").asText()); + MBeanInfo info = server.getMBeanInfo(name); + memoryUsageBroadcast.setName(name.toString()); - return memoryUsageBroadcast; - } catch (Exception e) { - LOGGER.error("Exception trying to deserialize MemoryUsageDeserializer object: {}", e); - return null; + for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) { + switch (attribute.getName()) { + case "Verbose": + memoryUsageBroadcast.setVerbose((boolean) server.getAttribute(name, attribute.getName())); + break; + case "ObjectPendingFinalizationCount": + memoryUsageBroadcast.setObjectPendingFinalizationCount(Long.parseLong(server.getAttribute(name, attribute.getName()).toString())); + break; + case "HeapMemoryUsage": + memoryUsageBroadcast.setHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used")); + break; + case "NonHeapMemoryUsage": + memoryUsageBroadcast.setNonHeapMemoryUsage((Long) ((CompositeDataSupport)server.getAttribute(name, attribute.getName())).get("used")); + break; + default: + break; } + } + + return memoryUsageBroadcast; + } catch (Exception ex) { + LOGGER.error("Exception trying to deserialize MemoryUsageDeserializer object: {}", ex); + return null; } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java index 8b65bf3..e5f5dcb 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java +++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java @@ -15,74 +15,82 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.jackson; +import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast; + import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonNode; -import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast; import org.slf4j.Logger; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Arrays; import javax.management.MBeanAttributeInfo; import javax.management.MBeanInfo; import javax.management.MBeanServer; import javax.management.ObjectName; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.Arrays; +/** + * StreamsTaskCounterDeserializer: a JsonDeserializer for StreamsTaskCounterBroadcast. + */ public class StreamsTaskCounterDeserializer extends JsonDeserializer<StreamsTaskCounterBroadcast> { - private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsTaskCounterDeserializer.class); - public StreamsTaskCounterDeserializer() { + private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsTaskCounterDeserializer.class); - } + public StreamsTaskCounterDeserializer() { - @Override - public StreamsTaskCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - try { - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + } - StreamsTaskCounterBroadcast streamsTaskCounterBroadcast = new StreamsTaskCounterBroadcast(); - JsonNode attributes = jsonParser.getCodec().readTree(jsonParser); + @Override + public StreamsTaskCounterBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + try { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - ObjectName name = new ObjectName(attributes.get("canonicalName").asText()); - MBeanInfo info = server.getMBeanInfo(name); - streamsTaskCounterBroadcast.setName(name.toString()); + StreamsTaskCounterBroadcast streamsTaskCounterBroadcast = new StreamsTaskCounterBroadcast(); + JsonNode attributes = jsonParser.getCodec().readTree(jsonParser); - for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) { - try { - switch (attribute.getName()) { - case "ErrorRate": - streamsTaskCounterBroadcast.setErrorRate((double) server.getAttribute(name, attribute.getName())); - break; - case "NumEmitted": - streamsTaskCounterBroadcast.setNumEmitted((long) server.getAttribute(name, attribute.getName())); - break; - case "NumReceived": - streamsTaskCounterBroadcast.setNumReceived((long) server.getAttribute(name, attribute.getName())); - break; - case "NumUnhandledErrors": - streamsTaskCounterBroadcast.setNumUnhandledErrors((long) server.getAttribute(name, attribute.getName())); - break; - case "AvgTime": - streamsTaskCounterBroadcast.setAvgTime((double) server.getAttribute(name, attribute.getName())); - break; - case "MaxTime": - streamsTaskCounterBroadcast.setMaxTime((long) server.getAttribute(name, attribute.getName())); - break; - } - } catch (Exception e) { - LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", e); - } - } + ObjectName name = new ObjectName(attributes.get("canonicalName").asText()); + MBeanInfo info = server.getMBeanInfo(name); + streamsTaskCounterBroadcast.setName(name.toString()); - return streamsTaskCounterBroadcast; - } catch (Exception e) { - LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", e); - return null; + for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) { + try { + switch (attribute.getName()) { + case "ErrorRate": + streamsTaskCounterBroadcast.setErrorRate((double) server.getAttribute(name, attribute.getName())); + break; + case "NumEmitted": + streamsTaskCounterBroadcast.setNumEmitted((long) server.getAttribute(name, attribute.getName())); + break; + case "NumReceived": + streamsTaskCounterBroadcast.setNumReceived((long) server.getAttribute(name, attribute.getName())); + break; + case "NumUnhandledErrors": + streamsTaskCounterBroadcast.setNumUnhandledErrors((long) server.getAttribute(name, attribute.getName())); + break; + case "AvgTime": + streamsTaskCounterBroadcast.setAvgTime((double) server.getAttribute(name, attribute.getName())); + break; + case "MaxTime": + streamsTaskCounterBroadcast.setMaxTime((long) server.getAttribute(name, attribute.getName())); + break; + default: + break; + } + } catch (Exception ex) { + LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", ex); } + } + + return streamsTaskCounterBroadcast; + } catch (Exception ex) { + LOGGER.error("Exception while trying to deserialize StreamsTaskCounterBroadcast object: {}", ex); + return null; } + } }
