Fixes/tests on sample management server on new RPC/Async framework
Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/e72417a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/e72417a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/e72417a1 Branch: refs/heads/javelin Commit: e72417a1e71178984e4445616a4907c7293ead67 Parents: a6c441f Author: Kelven Yang <[email protected]> Authored: Thu Dec 13 15:18:25 2012 -0800 Committer: Kelven Yang <[email protected]> Committed: Thu Dec 13 15:19:01 2012 -0800 ---------------------------------------------------------------------- .../framework/messaging/JsonMessageSerializer.java | 6 +++- .../framework/messaging/OnwireClassRegistry.java | 1 + .../framework/messaging/RpcProvider.java | 1 + .../framework/messaging/RpcProviderImpl.java | 23 ++++---------- .../framework/messaging/TransportProvider.java | 3 ++ .../messaging/client/ClientTransportProvider.java | 14 ++++++++ .../messaging/server/ServerTransportProvider.java | 14 ++++++++ .../resources/SampleManagementServerAppContext.xml | 24 ++++++++++----- 8 files changed, 60 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e72417a1/framework/ipc/src/org/apache/cloudstack/framework/messaging/JsonMessageSerializer.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/JsonMessageSerializer.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/JsonMessageSerializer.java index 8d4246b..3fed857 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/JsonMessageSerializer.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/JsonMessageSerializer.java @@ -19,16 +19,20 @@ package org.apache.cloudstack.framework.messaging; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; public class JsonMessageSerializer implements MessageSerializer { // this will be injected from external to allow installation of - // type adapters need by upper layer applications + // type adapters needed by upper layer applications private Gson _gson; private OnwireClassRegistry _clzRegistry; public JsonMessageSerializer() { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.setVersion(1.5); + _gson = gsonBuilder.create(); } public Gson getGson() { http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e72417a1/framework/ipc/src/org/apache/cloudstack/framework/messaging/OnwireClassRegistry.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/OnwireClassRegistry.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/OnwireClassRegistry.java index d26777e..7ac64bb 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/OnwireClassRegistry.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/OnwireClassRegistry.java @@ -42,6 +42,7 @@ public class OnwireClassRegistry { private Map<String, Class<?>> registry = new HashMap<String, Class<?>>(); public OnwireClassRegistry() { + registry.put("Object", Object.class); } public OnwireClassRegistry(String packageName) { http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e72417a1/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java index b7c3fd6..27096a5 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java @@ -28,6 +28,7 @@ public interface RpcProvider extends TransportMultiplexier { void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); + RpcClientCall newCall(); RpcClientCall newCall(String targetAddress); RpcClientCall newCall(TransportAddressMapper targetAddress); http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e72417a1/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java index 9be50e5..d8bf886 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java @@ -30,7 +30,7 @@ public class RpcProviderImpl implements RpcProvider { private String _transportAddress; private RpcTransportEndpoint _transportEndpoint = new RpcTransportEndpoint(); // transport attachment at RPC layer - private MessageSerializer _messageSerializer = new JsonMessageSerializer(); // default message serializer + private MessageSerializer _messageSerializer; private List<RpcServiceEndpoint> _serviceEndpoints = new ArrayList<RpcServiceEndpoint>(); private Map<Long, RpcClientCall> _outstandingCalls = new HashMap<Long, RpcClientCall>(); @@ -103,6 +103,11 @@ public class RpcProviderImpl implements RpcProvider { } @Override + public RpcClientCall newCall() { + return newCall(TransportAddress.getLocalPredefinedTransportAddress("RpcProvider").toString()); + } + + @Override public RpcClientCall newCall(String targetAddress) { long callTag = getNextCallTag(); @@ -111,22 +116,6 @@ public class RpcProviderImpl implements RpcProvider { call.setTargetAddress(targetAddress); call.setCallTag(callTag); - RpcCallRequestPdu pdu = new RpcCallRequestPdu(); - pdu.setCommand(call.getCommand()); - pdu.setRequestTag(callTag); - pdu.setRequestStartTick(System.currentTimeMillis()); - - String serializedCmdArg; - if(call.getCommandArg() != null) - serializedCmdArg = _messageSerializer.serializeTo(call.getCommandArg().getClass(), call.getCommandArg()); - else - serializedCmdArg = _messageSerializer.serializeTo(Object.class, null); - pdu.setSerializedCommandArg(serializedCmdArg); - - String serializedPdu = _messageSerializer.serializeTo(RpcCallRequestPdu.class, pdu); - _transportProvider.sendMessage(_transportAddress, targetAddress, RPC_MULTIPLEXIER, - serializedPdu); - return call; } http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e72417a1/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java index e25407f..71db312 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java @@ -19,6 +19,9 @@ package org.apache.cloudstack.framework.messaging; public interface TransportProvider { + void setMessageSerializer(MessageSerializer messageSerializer); + MessageSerializer getMessageSerializer(); + TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress); boolean detach(TransportEndpoint endpoint); http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e72417a1/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java index c2bbef7..fb522c2 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java @@ -18,12 +18,15 @@ */ package org.apache.cloudstack.framework.messaging.client; +import org.apache.cloudstack.framework.messaging.MessageSerializer; import org.apache.cloudstack.framework.messaging.TransportEndpoint; import org.apache.cloudstack.framework.messaging.TransportEndpointSite; import org.apache.cloudstack.framework.messaging.TransportProvider; public class ClientTransportProvider implements TransportProvider { + private MessageSerializer _messageSerializer; + @Override public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) { // TODO Auto-generated method stub @@ -36,6 +39,17 @@ public class ClientTransportProvider implements TransportProvider { return false; } + + @Override + public void setMessageSerializer(MessageSerializer messageSerializer) { + assert(messageSerializer != null); + _messageSerializer = messageSerializer; + } + + @Override + public MessageSerializer getMessageSerializer() { + return _messageSerializer; + } @Override public void requestSiteOutput(TransportEndpointSite site) { http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e72417a1/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java index 014c8fe..98177d6 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java @@ -24,6 +24,7 @@ import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.cloudstack.framework.messaging.MessageSerializer; import org.apache.cloudstack.framework.messaging.TransportAddress; import org.apache.cloudstack.framework.messaging.TransportDataPdu; import org.apache.cloudstack.framework.messaging.TransportEndpoint; @@ -46,6 +47,8 @@ public class ServerTransportProvider implements TransportProvider { private ExecutorService _executor; private int _nextEndpointId = new Random().nextInt(); + + private MessageSerializer _messageSerializer; public ServerTransportProvider() { } @@ -70,6 +73,17 @@ public class ServerTransportProvider implements TransportProvider { return this; } + @Override + public void setMessageSerializer(MessageSerializer messageSerializer) { + assert(messageSerializer != null); + _messageSerializer = messageSerializer; + } + + @Override + public MessageSerializer getMessageSerializer() { + return _messageSerializer; + } + public void initialize() { _executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker")); } http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e72417a1/framework/ipc/test/resources/SampleManagementServerAppContext.xml ---------------------------------------------------------------------- diff --git a/framework/ipc/test/resources/SampleManagementServerAppContext.xml b/framework/ipc/test/resources/SampleManagementServerAppContext.xml index f53ced0..9709f2f 100644 --- a/framework/ipc/test/resources/SampleManagementServerAppContext.xml +++ b/framework/ipc/test/resources/SampleManagementServerAppContext.xml @@ -16,22 +16,30 @@ <context:annotation-config /> <context:component-scan base-package="org.apache.cloudstack, com.cloud" /> + <bean id="onwireRegistry" class="org.apache.cloudstack.framework.messaging.OnwireClassRegistry" + init-method="scan" > + <property name="packages"> + <list> + <value>org.apache.cloudstack.framework.messaging</value> + </list> + </property> + </bean> + + <bean id="messageSerializer" class="org.apache.cloudstack.framework.messaging.JsonMessageSerializer"> + <property name="onwireClassRegistry" ref="onwireRegistry" /> + </bean> + <bean id="transportProvider" class="org.apache.cloudstack.framework.messaging.server.ServerTransportProvider" init-method="initialize"> <property name="workerPoolSize" value="5" /> <property name="nodeId" value="Node1" /> + <property name="messageSerializer" ref="messageSerializer" /> </bean> + <bean id="rpcProvider" class="org.apache.cloudstack.framework.messaging.RpcProviderImpl" init-method="initialize"> <constructor-arg ref="transportProvider" /> + <property name="messageSerializer" ref="messageSerializer" /> </bean> <bean id="eventBus" class = "org.apache.cloudstack.framework.messaging.EventBusBase" /> - <bean id="onwireRegistry" class="org.apache.cloudstack.framework.messaging.OnwireClassRegistry" - init-method="scan" > - <property name="packages"> - <list> - <value>org.apache.cloudstack.framework.messaging</value> - </list> - </property> - </bean> </beans>
