STREAMS-216 | All JMX monitoring beans now include identifying information and the time that the stream was started
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fb8f9d20 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fb8f9d20 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fb8f9d20 Branch: refs/heads/STREAMS-216 Commit: fb8f9d209bf604a3b2127e22441e54b8e3259ad7 Parents: 9a77a8f Author: Robert Douglas <[email protected]> Authored: Fri Nov 14 11:17:23 2014 -0600 Committer: Robert Douglas <[email protected]> Committed: Fri Nov 14 11:17:23 2014 -0600 ---------------------------------------------------------------------- .../tasks/BroadcastMonitorThread.java | 14 ------ .../org/apache/streams/pojo/json/Broadcast.json | 4 ++ .../local/builders/LocalStreamBuilder.java | 35 ++++++++++--- .../streams/local/builders/StreamComponent.java | 31 +++++++----- .../local/counters/DatumStatusCounter.java | 9 +++- .../local/counters/StreamsTaskCounter.java | 16 ++++-- .../streams/local/queues/ThroughputQueue.java | 52 +++++++++++++++++--- .../streams/local/tasks/BaseStreamsTask.java | 39 ++++++++++++++- .../streams/local/tasks/StreamsMergeTask.java | 7 ++- .../local/tasks/StreamsPersistWriterTask.java | 11 +++-- .../local/tasks/StreamsProcessorTask.java | 14 ++++-- .../local/tasks/StreamsProviderTask.java | 11 +++-- .../local/counters/StreamsTaskCounterTest.java | 16 +++--- .../streams/local/tasks/BasicTasksTest.java | 6 +-- .../local/tasks/StreamsProviderTaskTest.java | 10 ++-- 15 files changed, 202 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java index 25a5030..10b60b1 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java +++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java @@ -46,7 +46,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple private ObjectMapper objectMapper; private Map<String, Object> streamConfig; private String broadcastURI = null; - private String streamName = null; private MessagePersister messagePersister; private volatile boolean keepRunning; @@ -56,7 +55,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple server = ManagementFactory.getPlatformMBeanServer(); setBroadcastURI(); - setStreamName(); setWaitTime(); messagePersister = new BroadcastMessagePersister(broadcastURI); @@ -108,7 +106,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple } if(broadcast != null) { - broadcast.setStreamIdentifier(streamName); messages.add(objectMapper.writeValueAsString(broadcast)); } } @@ -136,17 +133,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple } } - private void setStreamName() { - if (streamConfig != null && - streamConfig.containsKey("streamsID") && - streamConfig.get("streamsID") != null && - streamConfig.get("streamsID") instanceof String) { - streamName = streamConfig.get("streamsID").toString(); - } else { - streamName = "{\"streamName\":\"Unknown Stream\"}"; - } - } - /** * Go through streams config and set the thread's wait time (if present) */ http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json index 94ec147..687ef9c 100644 --- a/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json +++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json @@ -12,6 +12,10 @@ "streamIdentifier": { "type": "string", "description": "The name of the Stream that is currently executing" + }, + "startedAt": { + "type": "integer", + "description": "Milliseconds since epoch when this Stream was started" } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index e3e42ff..19d50e1 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -51,6 +51,8 @@ public class LocalStreamBuilder implements StreamBuilder { public static final String BROADCAST_KEY = "broadcastURI"; public static final String STREAM_IDENTIFIER_KEY = "streamsID"; public static final String BROADCAST_INTERVAL_KEY = "monitoring_broadcast_interval_ms"; + public static final String DEFAULT_STREAM_IDENTIFIER = "Unknown_Stream"; + public static final String DEFAULT_STARTED_AT_KEY = "startedAt"; private Map<String, StreamComponent> providers; private Map<String, StreamComponent> components; @@ -65,6 +67,8 @@ public class LocalStreamBuilder implements StreamBuilder { private Thread shutdownHook; private BroadcastMonitorThread broadcastMonitor; private int maxQueueCapacity; + private String streamIdentifier = DEFAULT_STREAM_IDENTIFIER; + private DateTime startedAt = new DateTime(); /** * Creates a local stream builder with no config object and default maximum internal queue size of 500 @@ -112,6 +116,11 @@ public class LocalStreamBuilder implements StreamBuilder { } }; + setStreamIdentifier(); + if(this.streamConfig != null) { + this.streamConfig.put(DEFAULT_STARTED_AT_KEY, startedAt.getMillis()); + } + this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig); this.futures = new HashMap<>(); @@ -120,7 +129,7 @@ public class LocalStreamBuilder implements StreamBuilder { @Override public StreamBuilder newPerpetualStream(String id, StreamsProvider provider) { validateId(id); - this.providers.put(id, new StreamComponent(id, provider, true)); + this.providers.put(id, new StreamComponent(id, provider, true, streamConfig)); ++this.totalTasks; if( provider instanceof DatumStatusCountable ) ++this.monitorTasks; @@ -130,7 +139,7 @@ public class LocalStreamBuilder implements StreamBuilder { @Override public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) { validateId(id); - this.providers.put(id, new StreamComponent(id, provider, false)); + this.providers.put(id, new StreamComponent(id, provider, false, streamConfig)); ++this.totalTasks; if( provider instanceof DatumStatusCountable ) ++this.monitorTasks; @@ -140,7 +149,7 @@ public class LocalStreamBuilder implements StreamBuilder { @Override public StreamBuilder newReadNewStream(String id, StreamsProvider provider, BigInteger sequence) { validateId(id); - this.providers.put(id, new StreamComponent(id, provider, sequence)); + this.providers.put(id, new StreamComponent(id, provider, sequence, streamConfig)); ++this.totalTasks; if( provider instanceof DatumStatusCountable ) ++this.monitorTasks; @@ -150,7 +159,7 @@ public class LocalStreamBuilder implements StreamBuilder { @Override public StreamBuilder newReadRangeStream(String id, StreamsProvider provider, DateTime start, DateTime end) { validateId(id); - this.providers.put(id, new StreamComponent(id, provider, start, end)); + this.providers.put(id, new StreamComponent(id, provider, start, end, streamConfig)); ++this.totalTasks; if( provider instanceof DatumStatusCountable ) ++this.monitorTasks; @@ -160,7 +169,7 @@ public class LocalStreamBuilder implements StreamBuilder { @Override public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) { validateId(id); - StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id), numTasks); + StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig); this.components.put(id, comp); connectToOtherComponents(inBoundIds, comp); this.totalTasks += numTasks; @@ -172,7 +181,7 @@ public class LocalStreamBuilder implements StreamBuilder { @Override public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter writer, int numTasks, String... inBoundIds) { validateId(id); - StreamComponent comp = new StreamComponent(id, writer, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id), numTasks); + StreamComponent comp = new StreamComponent(id, writer, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id, streamIdentifier, startedAt.getMillis()), numTasks, streamConfig); this.components.put(id, comp); connectToOtherComponents(inBoundIds, comp); this.totalTasks += numTasks; @@ -281,7 +290,7 @@ public class LocalStreamBuilder implements StreamBuilder { for(StreamComponent prov : this.providers.values()) { StreamsTask task = prov.createConnectedTask(getTimeout()); task.setStreamConfig(this.streamConfig); - StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId()); + StreamsTaskCounter counter = new StreamsTaskCounter(prov.getId(), streamIdentifier, startedAt.getMillis()); task.setStreamsTaskCounter(counter); this.executor.submit(task); provTasks.put(prov.getId(), (StreamsProviderTask) task); @@ -296,7 +305,7 @@ public class LocalStreamBuilder implements StreamBuilder { for(StreamComponent comp : this.components.values()) { int tasks = comp.getNumTasks(); List<StreamsTask> compTasks = new LinkedList<StreamsTask>(); - StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId()); + StreamsTaskCounter counter = new StreamsTaskCounter(comp.getId(), streamIdentifier, startedAt.getMillis()); for(int i=0; i < tasks; ++i) { StreamsTask task = comp.createConnectedTask(getTimeout()); task.setStreamsTaskCounter(counter); @@ -410,4 +419,14 @@ public class LocalStreamBuilder implements StreamBuilder { return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? (Integer)streamConfig.get(TIMEOUT_KEY) : -1; } + private void setStreamIdentifier() { + if(streamConfig.containsKey(STREAM_IDENTIFIER_KEY) && + streamConfig.get(STREAM_IDENTIFIER_KEY) != null && + streamConfig.get(STREAM_IDENTIFIER_KEY).toString().length() > 0) { + this.streamIdentifier = streamConfig.get(STREAM_IDENTIFIER_KEY).toString(); + } else { + this.streamIdentifier = DEFAULT_STREAM_IDENTIFIER; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java index 4b96c5b..0dcc4d0 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java @@ -51,15 +51,18 @@ public class StreamComponent { private int numTasks = 1; private boolean perpetual; + private Map<String, Object> streamConfig; + /** * * @param id * @param provider */ - public StreamComponent(String id, StreamsProvider provider, boolean perpetual) { + public StreamComponent(String id, StreamsProvider provider, boolean perpetual, Map<String, Object> streamConfig) { this.id = id; this.provider = provider; this.perpetual = perpetual; + this.streamConfig = streamConfig; initializePrivateVariables(); } @@ -70,12 +73,13 @@ public class StreamComponent { * @param start * @param end */ - public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end) { + public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end, Map<String, Object> streamConfig) { this.id = id; this.provider = provider; this.dateRange = new DateTime[2]; this.dateRange[START] = start; this.dateRange[END] = end; + this.streamConfig = streamConfig; initializePrivateVariables(); } @@ -86,10 +90,11 @@ public class StreamComponent { * @param provider * @param sequence */ - public StreamComponent(String id, StreamsProvider provider, BigInteger sequence) { + public StreamComponent(String id, StreamsProvider provider, BigInteger sequence, Map<String, Object> streamConfig) { this.id = id; this.provider = provider; this.sequence = sequence; + this.streamConfig = streamConfig; } /** @@ -99,11 +104,12 @@ public class StreamComponent { * @param inQueue * @param numTasks */ - public StreamComponent(String id, StreamsProcessor processor, BlockingQueue<StreamsDatum> inQueue, int numTasks) { + public StreamComponent(String id, StreamsProcessor processor, BlockingQueue<StreamsDatum> inQueue, int numTasks, Map<String, Object> streamConfig) { this.id = id; this.processor = processor; this.inQueue = inQueue; this.numTasks = numTasks; + this.streamConfig = streamConfig; initializePrivateVariables(); } @@ -114,11 +120,12 @@ public class StreamComponent { * @param inQueue * @param numTasks */ - public StreamComponent(String id, StreamsPersistWriter writer, BlockingQueue<StreamsDatum> inQueue, int numTasks) { + public StreamComponent(String id, StreamsPersistWriter writer, BlockingQueue<StreamsDatum> inQueue, int numTasks, Map<String, Object> streamConfig) { this.id = id; this.writer = writer; this.inQueue = inQueue; this.numTasks = numTasks; + this.streamConfig = streamConfig; initializePrivateVariables(); } @@ -187,13 +194,13 @@ public class StreamComponent { StreamsTask task; if(this.processor != null) { if(this.numTasks > 1) { - task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor)); + task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor), streamConfig); task.addInputQueue(this.inQueue); for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { task.addOutputQueue(q); } } else { - task = new StreamsProcessorTask(this.processor); + task = new StreamsProcessorTask(this.processor, streamConfig); task.addInputQueue(this.inQueue); for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { task.addOutputQueue(q); @@ -202,10 +209,10 @@ public class StreamComponent { } else if(this.writer != null) { if(this.numTasks > 1) { - task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer)); + task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer), streamConfig); task.addInputQueue(this.inQueue); } else { - task = new StreamsPersistWriterTask(this.writer); + task = new StreamsPersistWriterTask(this.writer, streamConfig); task.addInputQueue(this.inQueue); } } @@ -217,11 +224,11 @@ public class StreamComponent { prov = this.provider; } if(this.dateRange == null && this.sequence == null) - task = new StreamsProviderTask(prov, this.perpetual); + task = new StreamsProviderTask(prov, this.perpetual, streamConfig); else if(this.sequence != null) - task = new StreamsProviderTask(prov, this.sequence); + task = new StreamsProviderTask(prov, this.sequence, streamConfig); else - task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1]); + task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1], streamConfig); //Adjust the timeout if necessary if(timeout != 0) { ((StreamsProviderTask)task).setTimeout(timeout); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java index acada71..34d2bcc 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java @@ -18,6 +18,7 @@ package org.apache.streams.local.counters; import net.jcip.annotations.ThreadSafe; +import org.apache.streams.local.builders.LocalStreamBuilder; import org.apache.streams.util.ComponentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,16 +33,20 @@ import java.util.concurrent.atomic.AtomicLong; @ThreadSafe public class DatumStatusCounter implements DatumStatusCounterMXBean{ - public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s"; + public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s,identifier=%s,startedAt=%s"; private static final Logger LOGGER = LoggerFactory.getLogger(DatumStatusCounter.class); private AtomicLong failed; private AtomicLong passed; public DatumStatusCounter(String id) { + this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } + + public DatumStatusCounter(String id, String streamIdentifier, long startedAt) { this.failed = new AtomicLong(0); this.passed = new AtomicLong(0); - ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this); + ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, streamIdentifier, startedAt), this); } public void incrementFailedCount() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java index 68c6364..9bd5d49 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java @@ -19,7 +19,9 @@ package org.apache.streams.local.counters; import net.jcip.annotations.GuardedBy; import net.jcip.annotations.ThreadSafe; +import org.apache.streams.local.builders.LocalStreamBuilder; import org.apache.streams.util.ComponentUtils; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,9 +33,9 @@ import java.util.concurrent.atomic.AtomicLong; * */ @ThreadSafe -public class StreamsTaskCounter implements StreamsTaskCounterMXBean{ +public class StreamsTaskCounter implements StreamsTaskCounterMXBean { - public static final String NAME_TEMPLATE = "org.apache.streams.local:type=StreamsTaskCounter,name=%s"; + public static final String NAME_TEMPLATE = "org.apache.streams.local:type=StreamsTaskCounter,name=%s,identifier=%s,startedAt=%s"; private static final Logger LOGGER = LoggerFactory.getLogger(StreamsTaskCounter.class); private AtomicLong emitted; @@ -48,12 +50,20 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{ * @param id */ public StreamsTaskCounter(String id) { + this(id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } + + /** + * + * @param id + */ + public StreamsTaskCounter(String id, String streamId, long startedAt) { this.emitted = new AtomicLong(0); this.received = new AtomicLong(0); this.errors = new AtomicLong(0); this.totalTime = new AtomicLong(0); this.maxTime = -1; - ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this); + ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id, streamId, startedAt), this); } /** http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java index de1add3..e4a6ab9 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/queues/ThroughputQueue.java @@ -17,6 +17,7 @@ */ package org.apache.streams.local.queues; +import org.apache.streams.local.builders.LocalStreamBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.reflect.generics.reflectiveObjects.NotImplementedException; @@ -43,7 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBean { - public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s"; + public static final String NAME_TEMPLATE = "org.apache.streams.local:type=ThroughputQueue,name=%s,identifier=%s,startedAt=%s"; private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueue.class); @@ -60,7 +61,16 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe * Creates an unbounded, unregistered {@code ThroughputQueue} */ public ThroughputQueue() { - this(-1, null); + this(-1, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } + + /** + * + * @param streamIdentifier + * @param startedAt + */ + public ThroughputQueue(String streamIdentifier, long startedAt) { + this(-1, null, streamIdentifier, startedAt); } /** @@ -69,7 +79,17 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded */ public ThroughputQueue(int maxSize) { - this(maxSize, null); + this(maxSize, null, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } + + /** + * + * @param maxSize + * @param streamIdentifier + * @param startedAt + */ + public ThroughputQueue(int maxSize, String streamIdentifier, long startedAt) { + this(maxSize, null, streamIdentifier, startedAt); } /** @@ -78,7 +98,27 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe * @param id unique id for this queue to be registered with. if id == NULL then not registered */ public ThroughputQueue(String id) { - this(-1, id); + this(-1, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } + + /** + * + * @param id + * @param streamIdentifier + * @param startedAt + */ + public ThroughputQueue(String id, String streamIdentifier, long startedAt) { + this(-1, id, streamIdentifier, startedAt); + } + + /** + * + * @param maxSize + * @param id + */ + public ThroughputQueue(int maxSize, String id) { + this(maxSize, id, LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER, -1); + } /** @@ -87,7 +127,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe * @param maxSize maximum capacity of queue, if maxSize < 1 then unbounded * @param id unique id for this queue to be registered with. if id == NULL then not registered */ - public ThroughputQueue(int maxSize, String id) { + public ThroughputQueue(int maxSize, String id, String streamIdentifier, long startedAt) { if (maxSize < 1) { this.underlyingQueue = new LinkedBlockingQueue<>(); } else { @@ -102,7 +142,7 @@ public class ThroughputQueue<E> implements BlockingQueue<E>, ThroughputQueueMXBe this.totalQueueTime = new AtomicLong(0); if (id != null) { try { - ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id)); + ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id, streamIdentifier, startedAt)); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); mbs.registerMBean(this, name); } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 6755d77..cfb231d 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.local.builders.LocalStreamBuilder; import org.apache.streams.pojo.json.Activity; import org.apache.streams.util.ComponentUtils; import org.apache.streams.util.SerializationUtil; @@ -45,10 +46,18 @@ public abstract class BaseStreamsTask implements StreamsTask { private List<BlockingQueue<StreamsDatum>> outQueues = new LinkedList<BlockingQueue<StreamsDatum>>(); private int inIndex = 0; private ObjectMapper mapper; + protected Map<String, Object> streamConfig; - public BaseStreamsTask() { + private long startedAt; + private String streamIdentifier; + + public BaseStreamsTask(Map<String, Object> config) { this.mapper = new StreamsJacksonMapper(); this.mapper.registerSubtypes(Activity.class); + this.streamConfig = config; + + setStreamIdentifier(); + setStartedAt(); } @@ -190,4 +199,32 @@ public abstract class BaseStreamsTask implements StreamsTask { } return copyTo; } + + public long getStartedAt() { + return startedAt; + } + + public void setStartedAt() { + if(streamConfig.containsKey(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) && + streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) != null && + streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY) instanceof Long) { + this.startedAt = Long.parseLong(streamConfig.get(LocalStreamBuilder.DEFAULT_STARTED_AT_KEY).toString()); + } else { + this.startedAt = -1; + } + } + + public String getStreamIdentifier() { + return streamIdentifier; + } + + public void setStreamIdentifier() { + if(streamConfig.containsKey(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) && + streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY) != null && + streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString().length() > 0) { + this.streamIdentifier = streamConfig.get(LocalStreamBuilder.STREAM_IDENTIFIER_KEY).toString(); + } else { + this.streamIdentifier = LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java index 8280f29..27d1c6e 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java @@ -37,10 +37,15 @@ public class StreamsMergeTask extends BaseStreamsTask { private long sleepTime; public StreamsMergeTask() { - this(DEFAULT_SLEEP_TIME_MS); + this(DEFAULT_SLEEP_TIME_MS, null); } public StreamsMergeTask(long sleepTime) { + this(sleepTime, null); + } + + public StreamsMergeTask(long sleepTime, Map<String, Object> streamConfig) { + super(streamConfig); this.sleepTime = sleepTime; this.keepRunning = new AtomicBoolean(true); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java index 003ab9e..235ee92 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java @@ -58,7 +58,11 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt * @param writer writer to execute in task */ public StreamsPersistWriterTask(StreamsPersistWriter writer) { - this(writer, DEFAULT_SLEEP_TIME_MS); + this(writer, DEFAULT_SLEEP_TIME_MS, null); + } + + public StreamsPersistWriterTask(StreamsPersistWriter writer, Map<String, Object> streamConfig) { + this(writer, DEFAULT_SLEEP_TIME_MS, streamConfig); } /** @@ -66,7 +70,8 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt * @param writer writer to execute in task * @param sleepTime time to sleep when inbound queue is empty. */ - public StreamsPersistWriterTask(StreamsPersistWriter writer, long sleepTime) { + public StreamsPersistWriterTask(StreamsPersistWriter writer, long sleepTime, Map<String, Object> streamConfig) { + super(streamConfig); this.writer = writer; this.sleepTime = sleepTime; this.keepRunning = new AtomicBoolean(true); @@ -99,7 +104,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt try { this.writer.prepare(this.streamConfig); if(this.counter == null) { - this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString()); + this.counter = new StreamsTaskCounter(this.writer.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt()); } while(this.keepRunning.get()) { StreamsDatum datum = null; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java index b6ab498..c470d0b 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java @@ -59,15 +59,23 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus * @param processor process to run in task */ public StreamsProcessorTask(StreamsProcessor processor) { - this(processor, DEFAULT_SLEEP_TIME_MS); + this(processor, DEFAULT_SLEEP_TIME_MS, null); } /** * + * @param processor + * @param streamConfig + */ + public StreamsProcessorTask(StreamsProcessor processor, Map<String, Object> streamConfig) { this(processor, DEFAULT_SLEEP_TIME_MS, streamConfig); } + + /** + * * @param processor processor to run in task * @param sleepTime time to sleep when incoming queue is empty */ - public StreamsProcessorTask(StreamsProcessor processor, long sleepTime) { + public StreamsProcessorTask(StreamsProcessor processor, long sleepTime, Map<String, Object> streamConfig) { + super(streamConfig); this.processor = processor; this.sleepTime = sleepTime; this.keepRunning = new AtomicBoolean(true); @@ -105,7 +113,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus try { this.processor.prepare(this.streamConfig); if(this.counter == null) { - this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString()); + this.counter = new StreamsTaskCounter(this.processor.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt()); } while(this.keepRunning.get()) { StreamsDatum datum = null; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java index 2475780..8c87d7a 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java @@ -72,7 +72,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()} * @param provider */ - public StreamsProviderTask(StreamsProvider provider, boolean perpetual) { + public StreamsProviderTask(StreamsProvider provider, boolean perpetual, Map<String, Object> streamConfig) { + super(streamConfig); this.provider = provider; if( perpetual ) this.type = Type.PERPETUAL; @@ -87,7 +88,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC * @param provider * @param sequence */ - public StreamsProviderTask(StreamsProvider provider, BigInteger sequence) { + public StreamsProviderTask(StreamsProvider provider, BigInteger sequence, Map<String, Object> streamConfig) { + super(streamConfig); this.provider = provider; this.type = Type.READ_NEW; this.sequence = sequence; @@ -101,7 +103,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC * @param start * @param end */ - public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end) { + public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end, Map<String, Object> streamConfig) { + super(streamConfig); this.provider = provider; this.type = Type.READ_RANGE; this.dateRange = new DateTime[2]; @@ -149,7 +152,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC //Negative values mean we want to run forever long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime); if(this.counter == null) { //should never be null - this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString()); + this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt()); } switch(this.type) { case PERPETUAL: { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java index a001845..da567fe 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java @@ -52,7 +52,7 @@ public class StreamsTaskCounterTest extends RandomizedTest { @Test public void testConstructor() { try { - new StreamsTaskCounter(MBEAN_ID); + new StreamsTaskCounter(MBEAN_ID, null, -1); } catch (Throwable t) { fail("Constructor threw error : "+t.getMessage()); } @@ -65,7 +65,7 @@ public class StreamsTaskCounterTest extends RandomizedTest { @Test @Repeat(iterations = 3) public void testEmitted() throws Exception { - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1); int numIncrements = randomIntBetween(1, 100000); for(int i=0; i < numIncrements; ++i) { counter.incrementEmittedCount(); @@ -74,7 +74,7 @@ public class StreamsTaskCounterTest extends RandomizedTest { unregisterMXBean(); - counter = new StreamsTaskCounter(MBEAN_ID); + counter = new StreamsTaskCounter(MBEAN_ID, null, -1); numIncrements = randomIntBetween(1, 100000); long total = 0; for(int i=0; i < numIncrements; ++i) { @@ -92,7 +92,7 @@ public class StreamsTaskCounterTest extends RandomizedTest { @Test @Repeat(iterations = 3) public void testReceived() throws Exception { - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1); int numIncrements = randomIntBetween(1, 100000); for(int i=0; i < numIncrements; ++i) { counter.incrementReceivedCount(); @@ -101,7 +101,7 @@ public class StreamsTaskCounterTest extends RandomizedTest { unregisterMXBean(); - counter = new StreamsTaskCounter(MBEAN_ID); + counter = new StreamsTaskCounter(MBEAN_ID, null, -1); numIncrements = randomIntBetween(1, 100000); long total = 0; for(int i=0; i < numIncrements; ++i) { @@ -119,7 +119,7 @@ public class StreamsTaskCounterTest extends RandomizedTest { @Test @Repeat(iterations = 3) public void testError() throws Exception { - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1); int numIncrements = randomIntBetween(1, 100000); for(int i=0; i < numIncrements; ++i) { counter.incrementErrorCount(); @@ -128,7 +128,7 @@ public class StreamsTaskCounterTest extends RandomizedTest { unregisterMXBean(); - counter = new StreamsTaskCounter(MBEAN_ID); + counter = new StreamsTaskCounter(MBEAN_ID, null, -1); numIncrements = randomIntBetween(1, 100000); long total = 0; for(int i=0; i < numIncrements; ++i) { @@ -146,7 +146,7 @@ public class StreamsTaskCounterTest extends RandomizedTest { @Test @Repeat(iterations = 3) public void testErrorRate() throws Exception { - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1); assertEquals(0.0, counter.getErrorRate(), 0); int failures = randomIntBetween(0, 100000); int received = randomIntBetween(0, 100000); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java index a0e28cd..d5efc41 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java @@ -57,7 +57,7 @@ public class BasicTasksTest { public void testProviderTask() { int numMessages = 100; NumericMessageProvider provider = new NumericMessageProvider(numMessages); - StreamsProviderTask task = new StreamsProviderTask(provider, false); + StreamsProviderTask task = new StreamsProviderTask(provider, false, null); BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); task.addOutputQueue(outQueue); //Test that adding input queues to providers is not valid @@ -101,7 +101,7 @@ public class BasicTasksTest { int numMessages = 100; PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(""); StreamsProcessorTask task = new StreamsProcessorTask(processor); - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1); task.setStreamsTaskCounter(counter); BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); @@ -145,7 +145,7 @@ public class BasicTasksTest { int numMessages = 100; DatumCounterWriter writer = new DatumCounterWriter(""); StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer); - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1); task.setStreamsTaskCounter(counter); BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f9d20/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java index 5e18650..222566d 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java @@ -61,7 +61,7 @@ public class StreamsProviderTaskTest { @Test public void runPerpetual() { - StreamsProviderTask task = new StreamsProviderTask(mockProvider, true); + StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); when(mockProvider.isRunning()).thenReturn(true); when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>())); task.setTimeout(500); @@ -75,7 +75,7 @@ public class StreamsProviderTaskTest { @Test public void flushes() { BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>(); - StreamsProviderTask task = new StreamsProviderTask(mockProvider, true); + StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); when(mockProvider.isRunning()).thenReturn(true); when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3))); task.setTimeout(100); @@ -95,7 +95,7 @@ public class StreamsProviderTaskTest { @Test public void runNonPerpetual() { - StreamsProviderTask task = new StreamsProviderTask(mockProvider, false); + StreamsProviderTask task = new StreamsProviderTask(mockProvider, false, null); when(mockProvider.isRunning()).thenReturn(true); when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>())); task.setTimeout(500); @@ -108,7 +108,7 @@ public class StreamsProviderTaskTest { @Test public void stoppable() throws InterruptedException { - StreamsProviderTask task = new StreamsProviderTask(mockProvider, true); + StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); when(mockProvider.isRunning()).thenReturn(true); when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>())); task.setTimeout(-1); @@ -129,7 +129,7 @@ public class StreamsProviderTaskTest { @Test public void earlyException() throws InterruptedException { - StreamsProviderTask task = new StreamsProviderTask(mockProvider, true); + StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); when(mockProvider.isRunning()).thenReturn(true); doThrow(new RuntimeException()).when(mockProvider).prepare(null); task.setTimeout(-1);
