Repository: incubator-streams Updated Branches: refs/heads/STREAMS-212 99e70b48c -> cc23286da refs/heads/STREAMS-246 7fe36e99e -> 21497e328 refs/heads/master acc7c84f0 -> 536f85d34
resolves STREAMS-230 Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dce7d357 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dce7d357 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dce7d357 Branch: refs/heads/STREAMS-212 Commit: dce7d357756cc717f261402883d56f9ad90aa84d Parents: 1b55741 Author: sblackmon <[email protected]> Authored: Thu Nov 20 15:55:27 2014 -0600 Committer: sblackmon <[email protected]> Committed: Thu Nov 20 15:55:27 2014 -0600 ---------------------------------------------------------------------- .../monitoring/tasks/BroadcastMonitorThread.java | 15 +++++++++++++++ .../streams/local/builders/LocalStreamBuilder.java | 8 +++++--- .../local/builders/LocalStreamBuilderTest.java | 4 +++- 3 files changed, 23 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dce7d357/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java index fd9354a..1854163 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java +++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java @@ -52,14 +52,20 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple public BroadcastMonitorThread(Map<String, Object> streamConfig) { keepRunning = true; this.streamConfig = streamConfig; + + LOGGER.info("BroadcastMonitorThread starting" + streamConfig); + server = ManagementFactory.getPlatformMBeanServer(); + setBroadcastURI(); setWaitTime(); messagePersister = new BroadcastMessagePersister(broadcastURI); initializeObjectMapper(); + + LOGGER.info("BroadcastMonitorThread started"); } /** @@ -85,6 +91,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple */ @Override public void run() { + LOGGER.info("BroadcastMonitorThread running"); while(keepRunning) { try { List<String> messages = Lists.newArrayList(); @@ -162,6 +169,14 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple LOGGER.debug("Shutting down BroadcastMonitor Thread"); } + public String getBroadcastURI() { + return broadcastURI; + } + + public long getWaitTime() { + return waitTime; + } + public long getDefaultWaitTime() { return DEFAULT_WAIT_TIME; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dce7d357/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index a9afc3c..e25a41b 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -18,6 +18,7 @@ package org.apache.streams.local.builders; +import com.google.common.base.Strings; import org.apache.streams.core.*; import org.apache.streams.local.counters.StreamsTaskCounter; import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor; @@ -204,8 +205,10 @@ public class LocalStreamBuilder implements StreamBuilder { boolean forcedShutDown = false; try { - monitorThread = new LocalStreamProcessMonitorThread(executor, 10); - this.monitor.submit(monitorThread); +// monitorThread = new LocalStreamProcessMonitorThread(executor, 10); +// this.monitor.submit(monitorThread); + if( broadcastMonitor.getWaitTime() != -1 ) + this.monitor.submit(broadcastMonitor); setupComponentTasks(tasks); setupProviderTasks(provTasks); LOGGER.info("Started stream with {} components", tasks.size()); @@ -313,7 +316,6 @@ public class LocalStreamBuilder implements StreamBuilder { this.futures.put(task, this.executor.submit(task)); compTasks.add(task); if( comp.isOperationCountable() ) { - this.monitor.submit(broadcastMonitor); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10)); this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10)); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dce7d357/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java index ed67003..28c8502 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java @@ -233,7 +233,9 @@ public class LocalStreamBuilderTest extends RandomizedTest { int numDatums2 = randomIntBetween(1, 300000); StreamsProcessor processor1 = new PassthroughDatumCounterProcessor("proc1"); StreamsProcessor processor2 = new PassthroughDatumCounterProcessor("proc2"); - StreamBuilder builder = new LocalStreamBuilder(); + Map<String, Object> streamConfig = Maps.newHashMap(); + streamConfig.put("monitoring_broadcast_interval_ms", -1); + StreamBuilder builder = new LocalStreamBuilder(streamConfig); builder.newPerpetualStream("sp1", new NumericMessageProvider(numDatums1)) .newPerpetualStream("sp2", new NumericMessageProvider(numDatums2)) .addStreamsProcessor("proc1", processor1, 1, "sp1")
