Author: chirino
Date: Thu Mar 12 18:40:32 2009
New Revision: 752964
URL: http://svn.apache.org/viewvc?rev=752964&view=rev
Log:
adding files I missed adding in the last commit.
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowManager.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowRelay.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ClientConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockClient.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java?rev=752964&r1=752963&r2=752964&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
Thu Mar 12 18:40:32 2009
@@ -45,7 +45,7 @@
return resourceName;
}
- protected void setResourceName(String resourceName) {
+ public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowManager.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowManager.java?rev=752964&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowManager.java
(added)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowManager.java
Thu Mar 12 18:40:32 2009
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Set;
+
+public class FlowManager
+{
+
+ protected long resourceCounter = 0;
+
+ private final HashMap <Long, IFlowResource> resources = new HashMap<Long,
IFlowResource> ();
+ private final HashMap <Long, Flow> flows = new HashMap<Long, Flow> ();
+ private final HashMap <String, Flow> flowsByName = new HashMap<String,
Flow> ();
+
+ public synchronized void registerResource(IFlowResource resource)
+ {
+ resources.put(resource.getResourceId(), resource);
+ }
+
+ public synchronized Flow createFlow(String name, boolean dynamic)
+ {
+ //TODO Should assign the flow based off of the hashcode of the
+ //name, and handle collisions.
+ //TODO Also implement dynamic flows whereby resources participating
+ //in a flow register with it, so that flows can be expired.
+ Flow flow = getFlow(name);
+ if(flow != null)
+ {
+ return flow;
+ }
+ else
+ {
+ flow = new Flow(name, dynamic);
+ flowsByName.put(name, flow);
+ flows.put(flow.getFlowID(), flow);
+ return flow;
+ }
+ }
+
+ public synchronized IFlowResource getResource(long id) {
+ return resources.get(id);
+ }
+
+ public synchronized Flow getFlow(long id) {
+ return flows.get(id);
+ }
+
+ public synchronized Flow getFlow(String name) {
+ return flowsByName.get(name);
+ }
+
+ public IFlowResource removeResource(long id) {
+ return resources.remove(id);
+ }
+
+ public Collection<Flow> getRegisteredFlows() {
+ return flows.values();
+ }
+
+ public Collection <IFlowResource> getRegisteredResources() {
+ return resources.values();
+ }
+
+ /**
+ * Returns a list of flow resources ids. Used for tooling.
+ */
+ public synchronized ArrayList<Long> getRegisteredResourceIDs()
+ {
+ Set<Long> rids = resources.keySet();
+ ArrayList<Long> ret = new ArrayList<Long>(rids.size());
+ ret.addAll(rids);
+ return ret;
+ }
+
+ /**
+ * Returns a list of flow resources ids. Used for tooling.
+ */
+ public synchronized ArrayList<Long> getRegisteredFlowIDs()
+ {
+ Set<Long> fids = flows.keySet();
+ ArrayList<Long> ret = new ArrayList<Long>(fids.size());
+ ret.addAll(fids);
+ return ret;
+ }
+}
+ /*
+ private class FlowMetricsCollector
+ implements Runnable
+ {
+ private final long m_collectionInterval = 10000;
+
+ private Thread m_thread;
+
+ private boolean m_started;
+
+ FlowMetricsCollector()
+ {
+
+ }
+
+ public void start()
+ {
+ synchronized ( this )
+ {
+ if ( m_started )
+ {
+ return;
+ }
+ m_thread = new Thread( this, "FlowMetricsCollector" );
+ m_started = true;
+ m_thread.start();
+ }
+ }
+
+ public void shutdown()
+ throws InterruptedException
+ {
+ synchronized ( this )
+ {
+ if ( !m_started )
+ {
+ return;
+ }
+ m_thread.interrupt();
+
+ try
+ {
+ m_thread.join();
+ }
+ finally
+ {
+ m_thread = null;
+ m_started = false;
+ }
+ }
+ }
+
+ public void run()
+ {
+ while ( !Thread.currentThread().isInterrupted() )
+ {
+ long startTime = System.currentTimeMillis();
+
+ Iterator metrics = getFlowMetrics().iterator();
+ while ( metrics.hasNext() )
+ {
+ FlowMetrics fm = (FlowMetrics) metrics.next();
+ fm.collect();
+ }
+
+ long endTime = System.currentTimeMillis();
+ long timeToNext = m_collectionInterval - ( endTime - startTime
);
+
+ if ( timeToNext <= 0 )
+ {
+ // TODO FLOWCONTROL should comment this out.
+ System.out.println( "Unable to maintain specified flow
metrics collection interval of "
+ + m_collectionInterval + "ms, last collection took: "
+ ( endTime - startTime ) + "ms" );
+ }
+ else
+ {
+ try
+ {
+ Thread.sleep( timeToNext );
+ }
+ catch ( InterruptedException ie )
+ {
+ return;
+ }
+ }
+ }
+ }
+ }*/
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowRelay.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowRelay.java?rev=752964&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowRelay.java
(added)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowRelay.java
Thu Mar 12 18:40:32 2009
@@ -0,0 +1,5 @@
+package org.apache.activemq.flow;
+
+public interface IFlowRelay<E> extends IFlowSink<E>, IFlowSource<E> {
+
+}
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ClientConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ClientConnection.java?rev=752964&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ClientConnection.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ClientConnection.java
Thu Mar 12 18:40:32 2009
@@ -0,0 +1,26 @@
+package org.apache.activemq.flow;
+
+import java.net.URI;
+
+import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.TransportFactory;
+
+public abstract class ClientConnection extends AbstractTestConnection{
+
+ private URI connectUri;
+
+ public void setConnectUri(URI uri) {
+ this.connectUri = uri;
+ }
+
+ public void start() throws Exception {
+ transport = TransportFactory.compositeConnect(connectUri);
+ transport.setTransportListener(this);
+ super.setTransport(transport);
+ super.initialize();
+ super.start();
+ // Let the remote side know our name.
+ write(name);
+ }
+
+}
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockClient.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockClient.java?rev=752964&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockClient.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockClient.java
Thu Mar 12 18:40:32 2009
@@ -0,0 +1,317 @@
+package org.apache.activemq.flow;
+
+import java.io.FileInputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.util.IntrospectionSupport;
+
+public class MockClient {
+
+ protected int performanceSamples = 3;
+ protected int samplingFrequency = 5000;
+
+ protected int numProducers = 1;
+ protected int numConsumers = 1;
+ protected int destCount = 1;
+ protected int numPriorities = 1;
+ protected boolean useInputQueues = false;
+
+ // Set to mockup up ptp:
+ protected boolean ptp = false;
+
+ protected String sendBrokerURI;
+ protected String receiveBrokerURI;
+
+ // Sets the number of threads to use:
+ protected int threadsPerDispatcher =
Runtime.getRuntime().availableProcessors();
+
+ protected MetricAggregator totalProducerRate = new
MetricAggregator().name("Aggregate Producer Rate").unit("items");
+ protected MetricAggregator totalConsumerRate = new
MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+ protected ArrayList<MetricCounter> additionalReportMetrics = new
ArrayList<MetricCounter>();
+ protected boolean includeDetailedRates = false;
+
+ protected IDispatcher dispatcher;
+
+ public RemoteConsumer consumer(int index) {
+ return consumers.get(index);
+ }
+
+ public RemoteProducer producer(int index) {
+ return producers.get(index);
+ }
+
+ public int getThreadsPerDispatcher() {
+ return threadsPerDispatcher;
+ }
+
+ public boolean isUseInputQueues() {
+ return useInputQueues;
+ }
+
+ public void setUseInputQueues(boolean useInputQueues) {
+ this.useInputQueues = useInputQueues;
+ }
+
+ public void setThreadsPerDispatcher(int threadPoolSize) {
+ this.threadsPerDispatcher = threadPoolSize;
+ }
+
+ public void setIncludeDetailedRates(boolean includeDetailedRates) {
+ this.includeDetailedRates = includeDetailedRates;
+ }
+
+ public boolean getIncludeDetailedRates() {
+ return includeDetailedRates;
+ }
+
+ public void includeInRateReport(RemoteProducer producer) {
+ additionalReportMetrics.add(producer.getRate());
+ }
+
+ public void includeInRateReport(RemoteConsumer consumer) {
+ additionalReportMetrics.add(consumer.getRate());
+ }
+
+ public int getSamplingFrequency() {
+ return samplingFrequency;
+ }
+
+ public void setSamplingFrequency(int samplingFrequency) {
+ this.samplingFrequency = samplingFrequency;
+ }
+
+
+ public int getNumProducers() {
+ return numProducers;
+ }
+
+ public void setNumProducers(int numProducers) {
+ this.numProducers = numProducers;
+ }
+
+ public int getNumConsumers() {
+ return numConsumers;
+ }
+
+ public void setNumConsumers(int numConsumers) {
+ this.numConsumers = numConsumers;
+ }
+
+ public int getDestCount() {
+ return destCount;
+ }
+
+ public void setDestCount(int destCount) {
+ this.destCount = destCount;
+ }
+
+ public int getNumPriorities() {
+ return numPriorities;
+ }
+
+ public void setNumPriorities(int numPriorities) {
+ this.numPriorities = numPriorities;
+ }
+
+ public boolean isPtp() {
+ return ptp;
+ }
+
+ public void setPtp(boolean ptp) {
+ this.ptp = ptp;
+ }
+
+ public String getSendBrokerURI() {
+ return sendBrokerURI;
+ }
+
+ public void setSendBrokerURI(String sendBrokerURI) {
+ this.sendBrokerURI = sendBrokerURI;
+ }
+
+ public String getReceiveBrokerURI() {
+ return receiveBrokerURI;
+ }
+
+ public void setReceiveBrokerURI(String receiveBrokerURI) {
+ this.receiveBrokerURI = receiveBrokerURI;
+ }
+
+ public int getPerformanceSamples() {
+ return performanceSamples;
+ }
+
+
+ protected final AtomicLong msgIdGenerator = new AtomicLong();
+
+ final ArrayList<RemoteProducer> producers = new
ArrayList<RemoteProducer>();
+ final ArrayList<RemoteConsumer> consumers = new
ArrayList<RemoteConsumer>();
+
+ private String testName;
+
+ private void createConsumer(int i, String connectUri, Destination
destination) throws URISyntaxException {
+ RemoteConsumer consumer = new RemoteConsumer();
+ consumer.setDestination(destination);
+ consumer.setName("consumer" + (i + 1));
+ consumer.setTotalConsumerRate(totalConsumerRate);
+ consumer.setDispatcher(dispatcher);
+ consumer.setConnectUri(new URI(connectUri));
+ consumer.setUseInputQueue(useInputQueues);
+ consumers.add(consumer);
+ }
+
+ private void createProducer(int id, String connectUri, Destination
destination) throws URISyntaxException {
+ RemoteProducer producer = new RemoteProducer();
+ producer.setProducerId(id + 1);
+ producer.setName("producer" + (id + 1));
+ producer.setDestination(destination);
+ producer.setMessageIdGenerator(msgIdGenerator);
+ producer.setTotalProducerRate(totalProducerRate);
+ producer.setDispatcher(dispatcher);
+ producer.setUseInputQueue(useInputQueues);
+ producer.setConnectUri(new URI(connectUri));
+ producers.add(producer);
+ }
+
+ private void reportRates() throws InterruptedException {
+ System.out.println("Checking rates for test: " + getTestName() + ", "
+ (ptp ? "ptp" : "topic"));
+ for (int i = 0; i < performanceSamples; i++) {
+ Period p = new Period();
+ Thread.sleep(samplingFrequency);
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ if (includeDetailedRates) {
+ System.out.println(totalProducerRate.getChildRateSummary(p));
+ System.out.println(totalConsumerRate.getChildRateSummary(p));
+ }
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+ }
+
+ public void setTestName(String testName) {
+ this.testName = testName;
+ }
+
+ public void setPerformanceSamples(int samples) {
+ this.performanceSamples = samples;
+ }
+
+ public String getTestName() {
+ return testName;
+ }
+
+ public void setDispatcher(IDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public void runTest() throws Exception {
+ getDispatcher().start();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ private void startServices() throws Exception {
+ AbstractTestConnection.setInShutdown(false, dispatcher);
+ for (RemoteConsumer connection : consumers) {
+ connection.start();
+ }
+
+ for (RemoteProducer connection : producers) {
+ connection.start();
+ }
+ }
+
+ private void stopServices() throws Exception {
+
+ AbstractTestConnection.setInShutdown(true, dispatcher);
+ for (RemoteProducer connection : producers) {
+ connection.stop();
+ }
+ for (RemoteConsumer connection : consumers) {
+ connection.stop();
+ }
+ }
+
+ public void createConnections() throws Exception {
+
+ DestinationBuffer[] dests = new DestinationBuffer[destCount];
+
+ for (int i = 0; i < destCount; i++) {
+ DestinationBean bean = new DestinationBean();
+ bean.setName(new AsciiBuffer("dest" + (i + 1)));
+ bean.setPtp(ptp);
+ dests[i] = bean.freeze();
+ }
+
+ for (int i = 0; i < numProducers; i++) {
+ Destination destination = dests[i % destCount];
+ createProducer(i, sendBrokerURI, destination);
+ }
+
+ for (int i = 0; i < numConsumers; i++) {
+ Destination destination = dests[i % destCount];
+ createConsumer(i, receiveBrokerURI, destination);
+ }
+ }
+
+ public IDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ protected IDispatcher createDispatcher() {
+ if (dispatcher == null) {
+ dispatcher =
PriorityDispatcher.createPriorityDispatchPool("ClientDispatcher",
numPriorities, threadsPerDispatcher);
+ }
+ return dispatcher;
+ }
+
+ /**
+ * Run the broker as a standalone app
+ *
+ * @param args
+ * The arguments.
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ MockClient test = new MockClient();
+ test.createDispatcher();
+
+ Properties props = new Properties();
+ if (args.length > 0) {
+ props.load(new FileInputStream(args[0]));
+ IntrospectionSupport.setProperties(test, props);
+ }
+ System.out.println(IntrospectionSupport.toString(test));
+ try
+ {
+ test.getDispatcher().start();
+ test.createConnections();
+ test.runTest();
+ }
+ finally
+ {
+ test.getDispatcher().shutdown();
+ }
+ }
+
+}