Updated Branches:
  refs/heads/javelin 225ad3c28 -> 1d7506321

Finish RPC calling side implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/1d750632
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/1d750632
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/1d750632

Branch: refs/heads/javelin
Commit: 1d75063217612b0695a9e430ed223ede31ffb7f5
Parents: 225ad3c
Author: Kelven Yang <[email protected]>
Authored: Tue Nov 27 20:31:10 2012 -0800
Committer: Kelven Yang <[email protected]>
Committed: Tue Nov 27 20:31:33 2012 -0800

----------------------------------------------------------------------
 .../framework/messaging/RpcCallRequestPdu.java     |   66 +++++++
 .../framework/messaging/RpcCallResponsePdu.java    |   78 +++++++++
 .../framework/messaging/RpcClientCall.java         |    2 +
 .../framework/messaging/RpcClientCallImpl.java     |  134 +++++++++++++--
 .../framework/messaging/RpcProvider.java           |    8 +-
 .../framework/messaging/RpcProviderImpl.java       |   92 +++++++++-
 .../framework/messaging/TransportProvider.java     |    3 +
 .../messaging/client/ClientTransportProvider.java  |    7 +
 .../messaging/server/ServerTransportProvider.java  |    6 +
 9 files changed, 374 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/1d750632/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallRequestPdu.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallRequestPdu.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallRequestPdu.java
new file mode 100644
index 0000000..0992116
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallRequestPdu.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+@OnwireName(name="RpcRequest")
+public class RpcCallRequestPdu {
+       
+       private long requestTag;
+       private long requestStartTick;
+       
+       private String command;
+       private String serializedCommandArg;
+
+       public RpcCallRequestPdu() {
+               requestTag = 0;
+               requestStartTick = System.currentTimeMillis();
+       }
+       
+       public long getRequestTag() {
+               return requestTag;
+       }
+       
+       public void setRequestTag(long requestTag) {
+               this.requestTag = requestTag;
+       }
+       
+       public long getRequestStartTick() {
+               return requestStartTick;
+       }
+       
+       public void setRequestStartTick(long requestStartTick) {
+               this.requestStartTick = requestStartTick;
+       }
+       
+       public String getCommand() {
+               return command;
+       }
+       
+       public void setCommand(String command) {
+               this.command = command;
+       }
+       
+       public String getSerializedCommandArg() {
+               return serializedCommandArg;
+       }
+       
+       public void setSerializedCommandArg(String serializedCommandArg) {
+               this.serializedCommandArg = serializedCommandArg;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/1d750632/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallResponsePdu.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallResponsePdu.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallResponsePdu.java
new file mode 100644
index 0000000..ca882e9
--- /dev/null
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallResponsePdu.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+@OnwireName(name="RpcResponse")
+public class RpcCallResponsePdu {
+       public static final int RESULT_SUCCESSFUL = 0;
+       public static final int RESULT_HANDLER_NOT_EXIST = 1;
+       public static final int RESULT_HANDLER_EXCEPTION = 2;
+       
+       private long requestTag;
+       private long requestStartTick;
+       
+       private int result;
+       private String command;
+       private String serializedResult;
+       
+       public RpcCallResponsePdu() {
+               requestTag = 0;
+               requestStartTick = 0;
+       }
+
+       public long getRequestTag() {
+               return requestTag;
+       }
+       
+       public void setRequestTag(long requestTag) {
+               this.requestTag = requestTag;
+       }
+       
+       public long getRequestStartTick() {
+               return requestStartTick;
+       }
+       
+       public void setRequestStartTick(long requestStartTick) {
+               this.requestStartTick = requestStartTick;
+       }
+       
+       public int getResult() {
+               return result;
+       }
+       
+       public void setResult(int result) {
+               this.result = result;
+       }
+       
+       public String getCommand() {
+               return command;
+       }
+       
+       public void setCommand(String command) {
+               this.command = command;
+       }
+       
+       public String getSerializedResult() {
+               return serializedResult;
+       }
+       
+       public void setSerializedResult(String serializedResult) {
+               this.serializedResult = serializedResult;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/1d750632/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
index 4582568..d2adbfd 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
@@ -19,6 +19,8 @@
 package org.apache.cloudstack.framework.messaging;
 
 public interface RpcClientCall {
+       final static int DEFAULT_RPC_TIMEOUT = 10000;
+       
        String getCommand();
        RpcClientCall setCommand(String cmd);
        RpcClientCall setTimeout(int timeoutMilliseconds);

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/1d750632/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
index 5dd1d94..574a273 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.cloudstack.framework.messaging;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class RpcClientCallImpl implements RpcClientCall {
@@ -26,17 +28,30 @@ public class RpcClientCallImpl implements RpcClientCall {
        private String _command;
        private Object _commandArg;
        
-       private int _timeoutMilliseconds;
-       
+       private int _timeoutMilliseconds = DEFAULT_RPC_TIMEOUT;
        private Map<String, Object> _contextParams = new HashMap<String, 
Object>();
+       private boolean _oneway = false;
+       
+       private List<RpcCallbackListener> _callbackListeners = new 
ArrayList<RpcCallbackListener>();
+
+       private RpcProvider _rpcProvider;
+       private long _startTickInMs;
+       private long _callTag;
+       private String _sourceAddress;
+       private String _targetAddress;
        
-       public RpcClientCallImpl() {
+       private Object _responseLock = new Object();
+       private boolean _responseDone = false;;
+       private Object _responseResult;
+               
+       public RpcClientCallImpl(RpcProvider rpcProvider) {
+               assert(rpcProvider != null);
+               _rpcProvider = rpcProvider;
        }
        
        @Override
        public String getCommand() {
-               // TODO Auto-generated method stub
-               return null;
+               return _command;
        }
 
        @Override
@@ -71,37 +86,126 @@ public class RpcClientCallImpl implements RpcClientCall {
 
        @Override
        public Object getContextParam(String key) {
-               // TODO Auto-generated method stub
-               return null;
+               return _contextParams.get(key);
        }
 
        @Override
        public <T> RpcClientCall addCallbackListener(RpcCallbackListener<T> 
listener) {
-               // TODO Auto-generated method stub
-               return null;
+               assert(listener != null);
+               _callbackListeners.add(listener);
+               return this;
        }
 
        @Override
        public RpcClientCall setOneway() {
-               // TODO Auto-generated method stub
-               return null;
+               _oneway = true;
+               return this;
+       }
+       
+       public String getSourceAddress() {
+               return _sourceAddress;
+       }
+       
+       public void setSourceAddress(String sourceAddress) {
+               _sourceAddress = sourceAddress;
+       }
+       
+       public String getTargetAddress() {
+               return _targetAddress;
+       }
+       
+       public void setTargetAddress(String targetAddress) {
+               _targetAddress = targetAddress;
+       }
+       
+       public long getCallTag() {
+               return _callTag;
+       }
+       
+       public void setCallTag(long callTag) {
+               _callTag = callTag;
        }
 
        @Override
        public void apply() {
-               // TODO Auto-generated method stub
+               // sanity check
+               assert(_sourceAddress != null);
+               assert(_targetAddress != null);
+               
+               if(!_oneway)
+                       _rpcProvider.registerCall(this);
                
+               RpcCallRequestPdu pdu = new RpcCallRequestPdu();
+               pdu.setCommand(getCommand());
+               if(_commandArg != null)
+                       
pdu.setSerializedCommandArg(_rpcProvider.getMessageSerializer().serializeTo(_commandArg.getClass(),
 _commandArg));
+               pdu.setRequestTag(this.getCallTag());
+               
+               _rpcProvider.sendRpcPdu(getSourceAddress(), getTargetAddress(), 
+                       
_rpcProvider.getMessageSerializer().serializeTo(RpcCallRequestPdu.class, pdu));
        }
 
        @Override
        public void cancel() {
-               // TODO Auto-generated method stub
-               
+               _rpcProvider.cancelCall(this);
        }
 
        @Override
        public <T> T get() {
-               // TODO Auto-generated method stub
+               if(!_oneway) {
+                       synchronized(_responseLock) {
+                               if(!_responseDone) {
+                                       long timeToWait = _timeoutMilliseconds 
- (System.currentTimeMillis() - _startTickInMs);
+                                       if(timeToWait < 0)
+                                               timeToWait = 0;
+                                       
+                                       try {
+                                               _responseLock.wait(timeToWait);
+                                       } catch (InterruptedException e) {
+                                               throw new 
RpcTimeoutException("RPC call timed out");
+                                       }
+                               }
+                               
+                               assert(_responseDone);
+                               
+                               if(_responseResult == null)
+                                       return null;
+                               
+                               if(_responseResult instanceof RpcException)
+                                       throw (RpcException)_responseResult;
+                               
+                               assert(_rpcProvider.getMessageSerializer() != 
null);
+                               assert(_responseResult instanceof String);
+                               return 
_rpcProvider.getMessageSerializer().serializeFrom((String)_responseResult);
+                       }
+               }
                return null;
        }
+       
+       public void complete(String result) {
+               _responseResult = result;
+               
+               synchronized(_responseLock) {
+                       _responseDone = true;
+                       _responseLock.notifyAll();
+               }
+               
+               assert(_rpcProvider.getMessageSerializer() != null);
+               Object resultObject = 
_rpcProvider.getMessageSerializer().serializeFrom(result);
+               for(RpcCallbackListener listener: _callbackListeners)
+                       listener.onSuccess(resultObject);
+       }
+       
+       public void complete(RpcException e) {
+               _responseResult = e;
+               
+               synchronized(_responseLock) {
+                       _responseDone = true;
+                       
+                       _responseLock.notifyAll();
+               }
+               
+               for(RpcCallbackListener listener: _callbackListeners)
+                       listener.onFailure(e);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/1d750632/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
index 334e09d..908912a 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
@@ -19,11 +19,17 @@
 package org.apache.cloudstack.framework.messaging;
 
 public interface RpcProvider extends TransportMultiplexier {
+       final static String RPC_MULTIPLEXIER = "rpc";
+       
        void setMessageSerializer(MessageSerializer messageSerializer);
        MessageSerializer getMessageSerializer();
        
        void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
        void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
        
-       RpcClientCall target(String target);
+       RpcClientCall newCall(String sourceAddress, String targetAddress);
+       void registerCall(RpcClientCall call);
+       void cancelCall(RpcClientCall call);
+       
+       void sendRpcPdu(String sourceAddress, String targetAddress, String 
serializedPdu);
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/1d750632/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
index fdbb27d..c652982 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
@@ -19,13 +19,18 @@
 package org.apache.cloudstack.framework.messaging;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class RpcProviderImpl implements RpcProvider {
        
+       private TransportProvider _transportProvider;
        private MessageSerializer _messageSerializer;
        private List<RpcServiceEndpoint> _serviceEndpoints = new 
ArrayList<RpcServiceEndpoint>();
-       private TransportProvider _transportProvider;
+       private Map<Long, RpcClientCall> _outstandingCalls = new HashMap<Long, 
RpcClientCall>();
+       
+       private long _nextCallTag = System.currentTimeMillis(); 
        
        public RpcProviderImpl() {
        }
@@ -41,9 +46,16 @@ public class RpcProviderImpl implements RpcProvider {
        @Override
        public void onTransportMessage(String senderEndpointAddress,
                String targetEndpointAddress, String multiplexer, String 
message) {
-
-               // TODO Auto-generated method stub
+               assert(_messageSerializer != null);
                
+               Object pdu = _messageSerializer.serializeFrom(message);
+               if(pdu instanceof RpcCallRequestPdu) {
+                       handleCallRequestPdu((RpcCallRequestPdu)pdu);
+               } else if(pdu instanceof RpcCallResponsePdu) {
+                       handleCallResponsePdu((RpcCallResponsePdu)pdu);
+               } else {
+                       assert(false);
+               }
        }
 
        @Override
@@ -71,8 +83,76 @@ public class RpcProviderImpl implements RpcProvider {
        }
 
        @Override
-       public RpcClientCall target(String target) {
-               // TODO Auto-generated method stub
-               return null;
+       public RpcClientCall newCall(String sourceAddress, String 
targetAddress) {
+               long callTag = getNextCallTag();
+               RpcClientCallImpl call = new RpcClientCallImpl(this);
+               call.setSourceAddress(sourceAddress);
+               call.setTargetAddress(targetAddress);
+               call.setCallTag(callTag);
+               
+               return call;
+       }
+       
+       @Override
+       public void registerCall(RpcClientCall call) {
+               assert(call != null);
+               synchronized(this) {
+                       
_outstandingCalls.put(((RpcClientCallImpl)call).getCallTag(), call);
+               }
+       }
+       
+       @Override
+       public void cancelCall(RpcClientCall call) {
+               synchronized(this) {
+                       
_outstandingCalls.remove(((RpcClientCallImpl)call).getCallTag());
+               }
+               
+               ((RpcClientCallImpl)call).complete(new RpcException("Call is 
cancelled"));
+       }
+       
+       @Override
+       public void sendRpcPdu(String sourceAddress, String targetAddress, 
String serializedPdu) {
+               assert(_transportProvider != null);
+               _transportProvider.sendMessage(sourceAddress, targetAddress, 
this.RPC_MULTIPLEXIER, serializedPdu);
+       }
+       
+       protected synchronized long getNextCallTag() {
+               long tag = _nextCallTag++;
+               if(tag == 0)
+                       tag++;
+               
+               return tag;
+       }
+       
+       private void handleCallRequestPdu(RpcCallRequestPdu pdu) {
+               // ???
+       }
+       
+       private void handleCallResponsePdu(RpcCallResponsePdu pdu) {
+               RpcClientCallImpl call = null;
+               
+               synchronized(this) {
+                       call = 
(RpcClientCallImpl)_outstandingCalls.remove(pdu.getRequestTag());
+               }
+               
+               if(call != null) {
+                       switch(pdu.getResult()) {
+                       case RpcCallResponsePdu.RESULT_SUCCESSFUL :
+                               call.complete(pdu.getSerializedResult());
+                               break;
+                               
+                       case RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST :
+                               call.complete(new RpcException("Handler does 
not exist"));
+                               break;
+                               
+                       case RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION :
+                               call.complete(new RpcException("Exception in 
handler"));
+                               break;
+                               
+                       default :
+                               assert(false);
+                               break;
+                       }
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/1d750632/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
index 6773e8d..c843b06 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
@@ -21,4 +21,7 @@ package org.apache.cloudstack.framework.messaging;
 public interface TransportProvider {
        void attach(TransportEndpoint endpoint, String predefinedAddress);
        void detach(TransportEndpoint endpoint);
+       
+       void sendMessage(String soureEndpointAddress, String 
targetEndpointAddress, 
+               String multiplexier, String message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/1d750632/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
index 665d207..551838e 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
@@ -32,4 +32,11 @@ public class ClientTransportProvider implements 
TransportProvider {
        public void detach(TransportEndpoint endpoint) {
                // TODO Auto-generated method stub
        }
+       
+       @Override
+       public void sendMessage(String soureEndpointAddress, String 
targetEndpointAddress, 
+               String multiplexier, String message) {
+               // TODO
+       }
+       
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/1d750632/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
index 1f7c12b..332e788 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
@@ -34,4 +34,10 @@ public class ServerTransportProvider implements 
TransportProvider {
                // TODO Auto-generated method stub
                
        }
+       
+       @Override
+       public void sendMessage(String soureEndpointAddress, String 
targetEndpointAddress, 
+               String multiplexier, String message) {
+               // TODO
+       }
 }

Reply via email to