Fixed tests by unregistering previously registered mbeans
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d71568dc Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d71568dc Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d71568dc Branch: refs/heads/blueprints Commit: d71568dc72716d8d6b10ef0de74df5325f3d1336 Parents: 7e65a42 Author: Ryan Ebanks <[email protected]> Authored: Mon Oct 20 15:30:28 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Mon Oct 20 15:30:28 2014 -0500 ---------------------------------------------------------------------- .../local/builders/LocalStreamBuilder.java | 5 +++ .../local/builders/LocalStreamBuilderTest.java | 32 +++++++++++++++++--- 2 files changed, 32 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d71568dc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index 57f3aa4..2161638 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -20,6 +20,7 @@ package org.apache.streams.local.builders; import org.apache.log4j.spi.LoggerFactory; import org.apache.streams.core.*; +import org.apache.streams.local.counters.StreamsTaskCounter; import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor; import org.apache.streams.local.queues.ThroughputQueue; import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread; @@ -271,6 +272,8 @@ public class LocalStreamBuilder implements StreamBuilder { for(StreamComponent prov : this.providers.values()) { StreamsTask task = prov.createConnectedTask(getTimeout()); task.setStreamConfig(this.streamConfig); + StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId()); + task.setStreamsTaskCounter(counter); this.executor.submit(task); provTasks.put(prov.getId(), (StreamsProviderTask) task); if( prov.isOperationCountable() ) { @@ -284,8 +287,10 @@ public class LocalStreamBuilder implements StreamBuilder { for(StreamComponent comp : this.components.values()) { int tasks = comp.getNumTasks(); List<StreamsTask> compTasks = new LinkedList<StreamsTask>(); + StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId()); for(int i=0; i < tasks; ++i) { StreamsTask task = comp.createConnectedTask(getTimeout()); + task.setStreamsTaskCounter(counter); task.setStreamConfig(this.streamConfig); this.futures.put(task, this.executor.submit(task)); compTasks.add(task); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d71568dc/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java index e602181..fea7e53 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java @@ -43,6 +43,7 @@ import org.apache.streams.core.StreamBuilder; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.local.counters.StreamsTaskCounter; import org.apache.streams.local.queues.ThroughputQueue; import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor; import org.apache.streams.local.test.processors.SlowProcessor; @@ -81,10 +82,17 @@ public class LocalStreamBuilderTest extends RandomizedTest { } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) { //No-op } + try { + mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id)))); + } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) { + //No-op + } } } + + @Test public void testStreamIdValidations() { StreamBuilder builder = new LocalStreamBuilder(); @@ -104,7 +112,7 @@ public class LocalStreamBuilderTest extends RandomizedTest { exp = e; } assertNotNull(exp); - removeRegisteredMBeans("1", "2"); + removeRegisteredMBeans("1", "2", "id"); } @Test @@ -160,9 +168,9 @@ public class LocalStreamBuilderTest extends RandomizedTest { } } finally { for(int i=0; i < numProcessors; ++i) { - removeRegisteredMBeans(processorId+i); + removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName()); } - removeRegisteredMBeans("writer"); + removeRegisteredMBeans("writer", "numeric_provider"); } } @@ -199,7 +207,7 @@ public class LocalStreamBuilderTest extends RandomizedTest { for(int i=0; i < numProcessors; ++i) { removeRegisteredMBeans(processorId+i); } - removeRegisteredMBeans("writer"); + removeRegisteredMBeans("writer", "numeric_provider"); } } @@ -221,7 +229,9 @@ public class LocalStreamBuilderTest extends RandomizedTest { assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get()); assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get()); } finally { - removeRegisteredMBeans("proc1", "proc2", "writer1"); + String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); + String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); + removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2"); } } @@ -239,6 +249,9 @@ public class LocalStreamBuilderTest extends RandomizedTest { assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get()); assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get()); } finally { + String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); + String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); + String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); removeRegisteredMBeans("prov1", "proc1", "proc2", "w1"); } } @@ -257,6 +270,9 @@ public class LocalStreamBuilderTest extends RandomizedTest { builder.start(); assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get()); } finally { + String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); + String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); + String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); removeRegisteredMBeans("prov1", "proc1", "w1"); } } @@ -278,6 +294,9 @@ public class LocalStreamBuilderTest extends RandomizedTest { //We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout)))); } finally { + String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); + String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); + String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); removeRegisteredMBeans("prov1", "proc1", "proc2", "w1"); } } @@ -305,6 +324,9 @@ public class LocalStreamBuilderTest extends RandomizedTest { service.awaitTermination(30000, TimeUnit.MILLISECONDS); assertThat(Thread.activeCount(), is(equalTo(before))); } finally { + String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); + String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); + String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); removeRegisteredMBeans("prov1", "proc1", "w1"); } }
