STREAMS-216 | Fixed unit tests and hardened LocalStreamBuilder, BaseStreamsTask, and BroadcastMonitorThread against NPEs
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a20f01ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a20f01ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a20f01ae Branch: refs/heads/master Commit: a20f01aefc5904ecb857fdb9a344023cf6a05100 Parents: fb8f9d2 Author: Robert Douglas <[email protected]> Authored: Tue Nov 18 11:24:04 2014 -0600 Committer: Robert Douglas <[email protected]> Committed: Tue Nov 18 11:24:04 2014 -0600 ---------------------------------------------------------------------- .../tasks/BroadcastMonitorThread.java | 4 ++-- .../local/builders/LocalStreamBuilder.java | 3 ++- .../streams/local/tasks/BaseStreamsTask.java | 6 ++++-- .../local/builders/LocalStreamBuilderTest.java | 9 +++++--- .../local/counters/DatumStatusCounterTest.java | 22 ++++++++++---------- .../queues/ThroughputQueueSingleThreadTest.java | 15 ++++++------- 6 files changed, 33 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java index 10b60b1..fd9354a 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java +++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java @@ -141,8 +141,8 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple if (streamConfig != null && streamConfig.containsKey("monitoring_broadcast_interval_ms") && streamConfig.get("monitoring_broadcast_interval_ms") != null && - streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long || - streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer) { + (streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long || + streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer)) { waitTime = Long.parseLong(streamConfig.get("monitoring_broadcast_interval_ms").toString()); } else { waitTime = DEFAULT_WAIT_TIME; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index 19d50e1..a9afc3c 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -420,7 +420,8 @@ public class LocalStreamBuilder implements StreamBuilder { } private void setStreamIdentifier() { - if(streamConfig.containsKey(STREAM_IDENTIFIER_KEY) && + if(streamConfig != null && + streamConfig.containsKey(STREAM_IDENTIFIER_KEY) && streamConfig.get(STREAM_IDENTIFIER_KEY) != null && streamConfig.get(STREAM_IDENTIFIER_KEY).toString().length() > 0) { this.streamIdentifier = streamConfig.get(STREAM_IDENTIFIER_KEY).toString(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index cfb231d..9726963 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -205,7 +205,8 @@ public abstract class BaseStreamsTask implements StreamsTask { } public void setStartedAt() { - if(streamConfig.containsKey(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) && + if(streamConfig != null && + streamConfig.containsKey(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) && streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) != null && streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) instanceof Long) { this.startedAt = Long.parseLong(streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY).toString()); @@ -219,7 +220,8 @@ public abstract class BaseStreamsTask implements StreamsTask { } public void setStreamIdentifier() { - if(streamConfig.containsKey(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) && + if(streamConfig != null && + streamConfig.containsKey(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) && streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) != null && streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString().length() > 0) { this.streamIdentifier = streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java index a675d87..ed67003 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java @@ -52,6 +52,7 @@ import org.apache.streams.local.test.providers.NumericMessageProvider; import org.apache.streams.local.test.writer.DatumCounterWriter; import org.apache.streams.local.test.writer.SystemOutWriter; import org.apache.streams.util.ComponentUtils; +import org.joda.time.DateTime; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -74,7 +75,9 @@ import javax.management.*; * */ public class LocalStreamBuilderTest extends RandomizedTest { - + private static final String MBEAN_ID = "test_id"; + private static final String STREAM_ID = "test_stream"; + private static long STREAM_START_TIME = (new DateTime()).getMillis(); @After public void removeLocalMBeans() { @@ -90,12 +93,12 @@ public class LocalStreamBuilderTest extends RandomizedTest { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); for(String id : ids) { try { - mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id))); + mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME))); } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) { //No-op } try { - mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id)))); + mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME)))); } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) { //No-op } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java index 3a9a8dc..9775c6f 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java @@ -19,6 +19,7 @@ package org.apache.streams.local.counters; import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.joda.time.DateTime; import org.junit.After; import org.junit.Test; @@ -32,7 +33,8 @@ import java.lang.management.ManagementFactory; public class DatumStatusCounterTest extends RandomizedTest { private static final String MBEAN_ID = "test_id"; - + private static final String STREAM_ID = "test_stream"; + private static long STREAM_START_TIME = (new DateTime()).getMillis(); /** @@ -42,7 +44,7 @@ public class DatumStatusCounterTest extends RandomizedTest { @After public void unregisterMXBean() throws Exception { try { - ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID))); + ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); } catch (InstanceNotFoundException ife) { //No-op } @@ -54,7 +56,7 @@ public class DatumStatusCounterTest extends RandomizedTest { @Test public void testConstructor() { try { - new DatumStatusCounter(MBEAN_ID); + new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); } catch (Throwable t) { fail("Constructor Threw Exception : "+t.getMessage()); } @@ -67,7 +69,7 @@ public class DatumStatusCounterTest extends RandomizedTest { @Test @Repeat(iterations = 3) public void testPassed() throws Exception { - DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID); + DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); int numIncrements = randomIntBetween(1, 100000); for(int i=0; i < numIncrements; ++i) { counter.incrementPassedCount(); @@ -76,7 +78,7 @@ public class DatumStatusCounterTest extends RandomizedTest { unregisterMXBean(); - counter = new DatumStatusCounter(MBEAN_ID); + counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); numIncrements = randomIntBetween(1, 100000); long total = 0; for(int i=0; i < numIncrements; ++i) { @@ -94,7 +96,7 @@ public class DatumStatusCounterTest extends RandomizedTest { @Test @Repeat(iterations = 3) public void testFailed() throws Exception { - DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID); + DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); int numIncrements = randomIntBetween(1, 100000); for(int i=0; i < numIncrements; ++i) { counter.incrementFailedCount(); @@ -103,7 +105,7 @@ public class DatumStatusCounterTest extends RandomizedTest { unregisterMXBean(); - counter = new DatumStatusCounter(MBEAN_ID); + counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); numIncrements = randomIntBetween(1, 100000); long total = 0; for(int i=0; i < numIncrements; ++i) { @@ -121,7 +123,7 @@ public class DatumStatusCounterTest extends RandomizedTest { @Test @Repeat(iterations = 3) public void testFailureRate() { - DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID); + DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); assertEquals(0.0, counter.getFailRate(), 0); int failures = randomIntBetween(0, 100000); int passes = randomIntBetween(0, 100000); @@ -129,6 +131,4 @@ public class DatumStatusCounterTest extends RandomizedTest { counter.incrementFailedCount(failures); assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0); } - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a20f01ae/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java index 2492161..ef669f4 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java @@ -20,6 +20,7 @@ package org.apache.streams.local.queues; import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.apache.streams.util.ComponentUtils; +import org.joda.time.DateTime; import org.junit.After; import org.junit.Test; @@ -35,7 +36,9 @@ import static org.junit.Assert.assertEquals; * Single thread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue} */ public class ThroughputQueueSingleThreadTest extends RandomizedTest { - + private static final String MBEAN_ID = "test_id"; + private static final String STREAM_ID = "test_stream"; + private static long STREAM_START_TIME = (new DateTime()).getMillis(); @After public void removeLocalMBeans() { @@ -208,10 +211,9 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest { try { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); Integer beanCount = mbs.getMBeanCount(); - String id = "testQueue"; - ThroughputQueue queue = new ThroughputQueue(id); + ThroughputQueue queue = new ThroughputQueue(MBEAN_ID, STREAM_ID, STREAM_START_TIME); assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount()); - ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id))); + ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); assertNotNull(mBean); } catch (Exception e) { fail("Failed to register MXBean : "+e.getMessage()); @@ -226,12 +228,11 @@ public class ThroughputQueueSingleThreadTest extends RandomizedTest { try { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); Integer beanCount = mbs.getMBeanCount(); - String id = "testQueue"; int numReg = randomIntBetween(2, 100); for(int i=0; i < numReg; ++i) { - ThroughputQueue queue = new ThroughputQueue(id+i); + ThroughputQueue queue = new ThroughputQueue(MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME); assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount()); - ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id+i))); + ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME))); assertNotNull(mBean); } } catch (Exception e) {
