Updated Branches: refs/heads/javelin f52950689 -> 11e9baca3
Add server side transport driver Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/11e9baca Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/11e9baca Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/11e9baca Branch: refs/heads/javelin Commit: 11e9baca371a962d397a84fceb20d25007082e04 Parents: f529506 Author: Kelven Yang <[email protected]> Authored: Wed Dec 12 11:44:53 2012 -0800 Committer: Kelven Yang <[email protected]> Committed: Wed Dec 12 11:44:53 2012 -0800 ---------------------------------------------------------------------- .../framework/messaging/TransportEndpointSite.java | 33 ++++++++- .../framework/messaging/TransportProvider.java | 2 + .../messaging/client/ClientTransportProvider.java | 5 ++ .../messaging/server/ServerTransportProvider.java | 52 ++++++++++++++- 4 files changed, 86 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java index ca6155b..82ed9f5 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java @@ -24,18 +24,25 @@ import java.util.List; import java.util.Map; public class TransportEndpointSite { + private TransportProvider _provider; private TransportEndpoint _endpoint; private TransportAddress _address; private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>(); private Map<String, TransportMultiplexier> _multiplexierMap = new HashMap<String, TransportMultiplexier>(); - public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) { + private int _outstandingSignalRequests; + + public TransportEndpointSite(TransportProvider provider, TransportEndpoint endpoint, TransportAddress address) { + assert(provider != null); assert(endpoint != null); assert(address != null); + _provider = provider; _endpoint = endpoint; _address = address; + + _outstandingSignalRequests = 0; } public TransportEndpoint getEndpoint() { @@ -68,7 +75,7 @@ public class TransportEndpointSite { _outputQueue.add(pdu); } - processOutput(); + signalOutputProcessRequest(); } public TransportPdu getNextOutputPdu() { @@ -80,7 +87,7 @@ public class TransportEndpointSite { return null; } - private void processOutput() { + public void processOutput() { TransportPdu pdu; TransportEndpoint endpoint = getEndpoint(); @@ -104,4 +111,24 @@ public class TransportEndpointSite { return multiplexier; } + + private void signalOutputProcessRequest() { + boolean proceed = false; + synchronized(this) { + if(_outstandingSignalRequests == 0) { + _outstandingSignalRequests++; + proceed = true; + } + } + + if(proceed) + _provider.requestSiteOutput(this); + } + + public void ackOutputProcessSignal() { + synchronized(this) { + assert(_outstandingSignalRequests == 1); + _outstandingSignalRequests--; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java index bdbdd17..e25407f 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java @@ -22,6 +22,8 @@ public interface TransportProvider { TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress); boolean detach(TransportEndpoint endpoint); + void requestSiteOutput(TransportEndpointSite site); + void sendMessage(String soureEndpointAddress, String targetEndpointAddress, String multiplexier, String message); } http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java index 60c07c3..c2bbef7 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java @@ -38,6 +38,11 @@ public class ClientTransportProvider implements TransportProvider { } @Override + public void requestSiteOutput(TransportEndpointSite site) { + // ??? + } + + @Override public void sendMessage(String soureEndpointAddress, String targetEndpointAddress, String multiplexier, String message) { // TODO http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java index 3372b75..014c8fe 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java @@ -21,6 +21,8 @@ package org.apache.cloudstack.framework.messaging.server; import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.cloudstack.framework.messaging.TransportAddress; import org.apache.cloudstack.framework.messaging.TransportDataPdu; @@ -28,20 +30,48 @@ import org.apache.cloudstack.framework.messaging.TransportEndpoint; import org.apache.cloudstack.framework.messaging.TransportEndpointSite; import org.apache.cloudstack.framework.messaging.TransportPdu; import org.apache.cloudstack.framework.messaging.TransportProvider; +import org.apache.log4j.Logger; + +import com.cloud.utils.concurrency.NamedThreadFactory; public class ServerTransportProvider implements TransportProvider { + private static final Logger s_logger = Logger.getLogger(ServerTransportProvider.class); + + public static final int DEFAULT_WORKER_POOL_SIZE = 5; + private String _nodeId; private Map<String, TransportEndpointSite> _endpointMap = new HashMap<String, TransportEndpointSite>(); + private int _poolSize = DEFAULT_WORKER_POOL_SIZE; + private ExecutorService _executor; private int _nextEndpointId = new Random().nextInt(); public ServerTransportProvider() { } - public String getNodeId() { return _nodeId; } - public void setNodeId(String nodeId) { + public String getNodeId() { + return _nodeId; + } + + public ServerTransportProvider setNodeId(String nodeId) { _nodeId = nodeId; + return this; + } + + public int getWorkerPoolSize() { + return _poolSize; + } + + public ServerTransportProvider setWorkerPoolSize(int poolSize) { + assert(poolSize > 0); + + _poolSize = poolSize; + return this; + } + + public void initialize() { + _executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker")); } @Override @@ -64,7 +94,7 @@ public class ServerTransportProvider implements TransportProvider { // already attached return endpointSite; } - endpointSite = new TransportEndpointSite(endpoint, transportAddress); + endpointSite = new TransportEndpointSite(this, endpoint, transportAddress); _endpointMap.put(endpointId, endpointSite); } @@ -87,6 +117,22 @@ public class ServerTransportProvider implements TransportProvider { } @Override + public void requestSiteOutput(final TransportEndpointSite site) { + _executor.execute(new Runnable() { + + @Override + public void run() { + try { + site.processOutput(); + site.ackOutputProcessSignal(); + } catch(Throwable e) { + s_logger.error("Unhandled exception", e); + } + } + }); + } + + @Override public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress, String multiplexier, String message) {
