Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-212 99e70b48c -> cc23286da
  refs/heads/STREAMS-246 7fe36e99e -> 21497e328
  refs/heads/master acc7c84f0 -> 536f85d34


resolves STREAMS-230


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dce7d357
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dce7d357
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dce7d357

Branch: refs/heads/STREAMS-212
Commit: dce7d357756cc717f261402883d56f9ad90aa84d
Parents: 1b55741
Author: sblackmon <[email protected]>
Authored: Thu Nov 20 15:55:27 2014 -0600
Committer: sblackmon <[email protected]>
Committed: Thu Nov 20 15:55:27 2014 -0600

----------------------------------------------------------------------
 .../monitoring/tasks/BroadcastMonitorThread.java     | 15 +++++++++++++++
 .../streams/local/builders/LocalStreamBuilder.java   |  8 +++++---
 .../local/builders/LocalStreamBuilderTest.java       |  4 +++-
 3 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dce7d357/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git 
a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
 
b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index fd9354a..1854163 100644
--- 
a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ 
b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -52,14 +52,20 @@ public class BroadcastMonitorThread extends 
NotificationBroadcasterSupport imple
     public BroadcastMonitorThread(Map<String, Object> streamConfig) {
         keepRunning = true;
         this.streamConfig = streamConfig;
+
+        LOGGER.info("BroadcastMonitorThread starting" + streamConfig);
+
         server = ManagementFactory.getPlatformMBeanServer();
 
+
         setBroadcastURI();
         setWaitTime();
 
         messagePersister = new BroadcastMessagePersister(broadcastURI);
 
         initializeObjectMapper();
+
+        LOGGER.info("BroadcastMonitorThread started");
     }
 
     /**
@@ -85,6 +91,7 @@ public class BroadcastMonitorThread extends 
NotificationBroadcasterSupport imple
      */
     @Override
     public void run() {
+        LOGGER.info("BroadcastMonitorThread running");
         while(keepRunning) {
             try {
                 List<String> messages = Lists.newArrayList();
@@ -162,6 +169,14 @@ public class BroadcastMonitorThread extends 
NotificationBroadcasterSupport imple
         LOGGER.debug("Shutting down BroadcastMonitor Thread");
     }
 
+    public String getBroadcastURI() {
+        return broadcastURI;
+    }
+
+    public long getWaitTime() {
+        return waitTime;
+    }
+
     public long getDefaultWaitTime() {
         return DEFAULT_WAIT_TIME;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dce7d357/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index a9afc3c..e25a41b 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.local.builders;
 
+import com.google.common.base.Strings;
 import org.apache.streams.core.*;
 import org.apache.streams.local.counters.StreamsTaskCounter;
 import 
org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
@@ -204,8 +205,10 @@ public class LocalStreamBuilder implements StreamBuilder {
         boolean forcedShutDown = false;
 
         try {
-            monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
-            this.monitor.submit(monitorThread);
+//            monitorThread = new LocalStreamProcessMonitorThread(executor, 
10);
+//            this.monitor.submit(monitorThread);
+            if( broadcastMonitor.getWaitTime() != -1 )
+                this.monitor.submit(broadcastMonitor);
             setupComponentTasks(tasks);
             setupProviderTasks(provTasks);
             LOGGER.info("Started stream with {} components", tasks.size());
@@ -313,7 +316,6 @@ public class LocalStreamBuilder implements StreamBuilder {
                 this.futures.put(task, this.executor.submit(task));
                 compTasks.add(task);
                 if( comp.isOperationCountable() ) {
-                    this.monitor.submit(broadcastMonitor);
                     this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
                     this.monitor.submit(new 
StatusCounterMonitorThread((DatumStatusCountable) task, 10));
                 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dce7d357/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index ed67003..28c8502 100644
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -233,7 +233,9 @@ public class LocalStreamBuilderTest extends RandomizedTest {
             int numDatums2 = randomIntBetween(1, 300000);
             StreamsProcessor processor1 = new 
PassthroughDatumCounterProcessor("proc1");
             StreamsProcessor processor2 = new 
PassthroughDatumCounterProcessor("proc2");
-            StreamBuilder builder = new LocalStreamBuilder();
+            Map<String, Object> streamConfig = Maps.newHashMap();
+            streamConfig.put("monitoring_broadcast_interval_ms", -1);
+            StreamBuilder builder = new LocalStreamBuilder(streamConfig);
             builder.newPerpetualStream("sp1", new 
NumericMessageProvider(numDatums1))
                     .newPerpetualStream("sp2", new 
NumericMessageProvider(numDatums2))
                     .addStreamsProcessor("proc1", processor1, 1, "sp1")

Reply via email to