Updated Branches:
  refs/heads/javelin f52950689 -> 11e9baca3

Add server side transport driver


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/11e9baca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/11e9baca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/11e9baca

Branch: refs/heads/javelin
Commit: 11e9baca371a962d397a84fceb20d25007082e04
Parents: f529506
Author: Kelven Yang <[email protected]>
Authored: Wed Dec 12 11:44:53 2012 -0800
Committer: Kelven Yang <[email protected]>
Committed: Wed Dec 12 11:44:53 2012 -0800

----------------------------------------------------------------------
 .../framework/messaging/TransportEndpointSite.java |   33 ++++++++-
 .../framework/messaging/TransportProvider.java     |    2 +
 .../messaging/client/ClientTransportProvider.java  |    5 ++
 .../messaging/server/ServerTransportProvider.java  |   52 ++++++++++++++-
 4 files changed, 86 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
index ca6155b..82ed9f5 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
@@ -24,18 +24,25 @@ import java.util.List;
 import java.util.Map;
 
 public class TransportEndpointSite {
+       private TransportProvider _provider;
        private TransportEndpoint _endpoint;
        private TransportAddress _address;
        
        private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>();
        private Map<String, TransportMultiplexier> _multiplexierMap = new 
HashMap<String, TransportMultiplexier>();  
        
-       public TransportEndpointSite(TransportEndpoint endpoint, 
TransportAddress address) {
+       private int _outstandingSignalRequests;
+       
+       public TransportEndpointSite(TransportProvider provider, 
TransportEndpoint endpoint, TransportAddress address) {
+               assert(provider != null);
                assert(endpoint != null);
                assert(address != null);
                
+               _provider = provider;
                _endpoint = endpoint;
                _address = address;
+               
+               _outstandingSignalRequests = 0;
        }
        
        public TransportEndpoint getEndpoint() {
@@ -68,7 +75,7 @@ public class TransportEndpointSite {
                        _outputQueue.add(pdu);
                }
                
-               processOutput();
+               signalOutputProcessRequest();
        }
        
        public TransportPdu getNextOutputPdu() {
@@ -80,7 +87,7 @@ public class TransportEndpointSite {
                return null;
        }
        
-       private void processOutput() {
+       public void processOutput() {
                TransportPdu pdu;
                TransportEndpoint endpoint = getEndpoint();
 
@@ -104,4 +111,24 @@ public class TransportEndpointSite {
                
                return multiplexier;
        }
+       
+       private void signalOutputProcessRequest() {
+               boolean proceed = false;
+               synchronized(this) {
+                       if(_outstandingSignalRequests == 0) {
+                               _outstandingSignalRequests++;
+                               proceed = true;
+                       }
+               }
+               
+               if(proceed)
+                       _provider.requestSiteOutput(this);
+       }
+       
+       public void ackOutputProcessSignal() {
+               synchronized(this) {
+                       assert(_outstandingSignalRequests == 1);
+                       _outstandingSignalRequests--;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
index bdbdd17..e25407f 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
@@ -22,6 +22,8 @@ public interface TransportProvider {
        TransportEndpointSite attach(TransportEndpoint endpoint, String 
predefinedAddress);
        boolean detach(TransportEndpoint endpoint);
        
+       void requestSiteOutput(TransportEndpointSite site);
+       
        void sendMessage(String soureEndpointAddress, String 
targetEndpointAddress, 
                String multiplexier, String message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
index 60c07c3..c2bbef7 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
@@ -38,6 +38,11 @@ public class ClientTransportProvider implements 
TransportProvider {
        }
        
        @Override
+       public void requestSiteOutput(TransportEndpointSite site) {
+               // ???
+       }
+       
+       @Override
        public void sendMessage(String soureEndpointAddress, String 
targetEndpointAddress, 
                String multiplexier, String message) {
                // TODO

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/11e9baca/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
index 3372b75..014c8fe 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
@@ -21,6 +21,8 @@ package org.apache.cloudstack.framework.messaging.server;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.cloudstack.framework.messaging.TransportAddress;
 import org.apache.cloudstack.framework.messaging.TransportDataPdu;
@@ -28,20 +30,48 @@ import 
org.apache.cloudstack.framework.messaging.TransportEndpoint;
 import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
 import org.apache.cloudstack.framework.messaging.TransportPdu;
 import org.apache.cloudstack.framework.messaging.TransportProvider;
+import org.apache.log4j.Logger;
+
+import com.cloud.utils.concurrency.NamedThreadFactory;
 
 public class ServerTransportProvider implements TransportProvider {
+    private static final Logger s_logger = 
Logger.getLogger(ServerTransportProvider.class);
+       
+       public static final int DEFAULT_WORKER_POOL_SIZE = 5;
+       
        private String _nodeId;
 
        private Map<String, TransportEndpointSite> _endpointMap = new 
HashMap<String, TransportEndpointSite>();
+       private int _poolSize = DEFAULT_WORKER_POOL_SIZE;
+       private ExecutorService _executor;
        
        private int _nextEndpointId = new Random().nextInt();
        
        public ServerTransportProvider() {
        }
        
-       public String getNodeId() { return _nodeId; }
-       public void setNodeId(String nodeId) {
+       public String getNodeId() { 
+               return _nodeId; 
+       }
+       
+       public ServerTransportProvider setNodeId(String nodeId) {
                _nodeId = nodeId;
+               return this;
+       }
+       
+       public int getWorkerPoolSize() {
+               return _poolSize; 
+       }
+       
+       public ServerTransportProvider setWorkerPoolSize(int poolSize) {
+               assert(poolSize > 0);
+               
+               _poolSize = poolSize;
+               return this;
+       }
+       
+       public void initialize() {
+               _executor = Executors.newFixedThreadPool(_poolSize, new 
NamedThreadFactory("Transport-Worker"));
        }
        
        @Override
@@ -64,7 +94,7 @@ public class ServerTransportProvider implements 
TransportProvider {
                                // already attached
                                return endpointSite;
                        }
-                       endpointSite = new TransportEndpointSite(endpoint, 
transportAddress);
+                       endpointSite = new TransportEndpointSite(this, 
endpoint, transportAddress);
                        _endpointMap.put(endpointId, endpointSite);
                }
                
@@ -87,6 +117,22 @@ public class ServerTransportProvider implements 
TransportProvider {
        }
        
        @Override
+       public void requestSiteOutput(final TransportEndpointSite site) {
+               _executor.execute(new Runnable() {
+
+                       @Override
+                       public void run() {
+                               try {
+                                       site.processOutput();
+                                       site.ackOutputProcessSignal();
+                               } catch(Throwable e) {
+                                       s_logger.error("Unhandled exception", 
e);
+                               }
+                       }
+               });
+       }
+       
+       @Override
        public void sendMessage(String sourceEndpointAddress, String 
targetEndpointAddress, 
                String multiplexier, String message) {
                

Reply via email to