gdamour 2004/03/24 03:37:07
Modified:
sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
StreamManagerImpl.java RequestSender.java
Connector.java MetaConnection.java
sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication
ReplicationMember.java
sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging
DummyConnector.java CommandRequestTest.java
sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication
ReplicationTest.java
Added:
sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
AbstractConnector.java Node.java NodeContext.java
NodeImpl.java NodeProcessors.java
sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication
ReplicationMemberImpl.java
sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging
NodeTest.java
Removed:
sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
ServerNode.java ServerNodeContext.java
ServerProcessors.java
sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging
ServerNodeTest.java
Log:
Round of refactoring in order to be compliant with the GBean framework:
o Interfaces have been defined for References;
o It is now up to Connectors to add/remove themselves to the Node
enabling Msgs passing;
o A base implementation for the Connector contract is provided;
o JUnit tests have been updated in order to also test the GBean
configuration;
o ServerNode, ServerNodeContext and ServerProcessors are been
renamed Node, NodeContext and NodeProcessors respectively.
Revision Changes Path
1.4 +6 -50
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamManagerImpl.java
Index: StreamManagerImpl.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamManagerImpl.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- StreamManagerImpl.java 11 Mar 2004 15:36:14 -0000 1.3
+++ StreamManagerImpl.java 24 Mar 2004 11:37:05 -0000 1.4
@@ -34,6 +34,7 @@
* @version $Revision$ $Date$
*/
public class StreamManagerImpl
+ extends AbstractConnector
implements Connector, StreamManager
{
@@ -70,26 +71,13 @@
private final Map inputStreams;
/**
- * Context of the ServerNode which has mounted this instance.
- */
- protected ServerNodeContext serverNodeContext;
-
- /**
- * To send requests.
- */
- protected RequestSender sender;
-
- /**
- * Used to communicate with remote StreamManagers.
- */
- protected MsgOutInterceptor out;
-
- /**
* Creates a manager owned by the specified node.
*
+ * @param aServerNode ServerNode containing this instance.
* @param aNode Node owning this manager.
*/
- public StreamManagerImpl(NodeInfo aNode) {
+ public StreamManagerImpl(NodeImpl aServerNode, NodeInfo aNode) {
+ super(aServerNode);
if ( null == aNode ) {
throw new IllegalArgumentException("Node is required.");
}
@@ -101,12 +89,6 @@
return owningNode.getName();
}
- public void setContext(ServerNodeContext aContext) {
- serverNodeContext = aContext;
- sender = aContext.getRequestSender();
- out = aContext.getOutput();
- }
-
public Object register(InputStream anIn) {
if ( null == anIn ) {
return NULL_INPUT_STREAM;
@@ -175,17 +157,6 @@
}
- public void deliver(Msg aMsg) {
- MsgHeader header = aMsg.getHeader();
- MsgBody.Type bodyType =
- (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
- if ( bodyType.equals(MsgBody.Type.REQUEST) ) {
- handleRequest(aMsg);
- } else if ( bodyType.equals(MsgBody.Type.RESPONSE) ) {
- handleResponse(aMsg);
- }
- }
-
/**
* Handles a request Msg.
*
@@ -221,21 +192,6 @@
reqOut.push(msg);
}
- /**
- * Handles a response Msg.
- *
- * @param aMsg Response to be handled.
- */
- protected void handleResponse(Msg aMsg) {
- MsgBody body = aMsg.getBody();
- MsgHeader header = aMsg.getHeader();
- CommandResult result;
- result = (CommandResult) body.getContent();
- sender.setResponse(
- header.getHeader(MsgHeaderConstants.CORRELATION_ID),
- result);
- }
-
/**
* InputStream returned when a GInputStream is deserialized. This
* InputStream calls back its StreamManager when its internal buffer is
1.6 +2 -1
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/RequestSender.java
Index: RequestSender.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/RequestSender.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- RequestSender.java 18 Mar 2004 12:14:05 -0000 1.5
+++ RequestSender.java 24 Mar 2004 11:37:05 -0000 1.6
@@ -107,6 +107,7 @@
RequestID id = createID(aTargetNodes);
header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
header.addHeader(MsgHeaderConstants.DEST_NODES, aTargetNodes);
+ header.addHeader(MsgHeaderConstants.BODY_TYPE, MsgBody.Type.REQUEST);
MsgBody body = msg.getBody();
body.setContent(anOpaque);
1.3 +2 -2
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Connector.java
Index: Connector.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Connector.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- Connector.java 11 Mar 2004 15:36:14 -0000 1.2
+++ Connector.java 24 Mar 2004 11:37:05 -0000 1.3
@@ -41,7 +41,7 @@
/**
* Sets the ServerNode context of this Connector.
*/
- public void setContext(ServerNodeContext aContext);
+ public void setContext(NodeContext aContext);
/**
* When a ServerNode receives a Msg to be delivered to a Connector, it
1.6 +4 -4
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java
Index: MetaConnection.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- MetaConnection.java 18 Mar 2004 12:14:05 -0000 1.5
+++ MetaConnection.java 24 Mar 2004 11:37:05 -0000 1.6
@@ -43,7 +43,7 @@
/**
* Node owning this connection.
*/
- private final ServerNode node;
+ private final NodeImpl node;
/**
* NodeInfo to Connection map.
@@ -72,7 +72,7 @@
*
* @param aNode Node.
*/
- public MetaConnection(ServerNode aNode) {
+ public MetaConnection(NodeImpl aNode) {
if ( null == aNode ) {
throw new IllegalArgumentException("Node is required.");
}
1.1
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/AbstractConnector.java
Index: AbstractConnector.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
import java.util.Arrays;
import org.apache.geronimo.gbean.GAttributeInfo;
import org.apache.geronimo.gbean.GBean;
import org.apache.geronimo.gbean.GBeanContext;
import org.apache.geronimo.gbean.GBeanInfo;
import org.apache.geronimo.gbean.GBeanInfoFactory;
import org.apache.geronimo.gbean.GConstructorInfo;
import org.apache.geronimo.gbean.WaitingException;
/**
* Based implementation for the Connector contracts.
*
* @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:05 $
*/
public abstract class AbstractConnector implements Connector, GBean {
/**
* Node owning this Connector.
*/
protected Node node;
/**
* Context of the ServerNode which has mounted this instance.
*/
protected NodeContext serverNodeContext;
/**
* To send requests.
*/
protected RequestSender sender;
/**
* Used to communicate with remote Connectors.
*/
protected MsgOutInterceptor out;
/**
* Creates a Connector, which is mounted by the specified node.
*
* @param aNode Node owning this connector.
*/
public AbstractConnector(Node aNode) {
if ( null == aNode ) {
throw new IllegalArgumentException("Node is required.");
}
node = aNode;
}
public void setContext(NodeContext aContext) {
serverNodeContext = aContext;
sender = aContext.getRequestSender();
out = aContext.getOutput();
}
public void deliver(Msg aMsg) {
MsgHeader header = aMsg.getHeader();
MsgBody.Type bodyType =
(MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
if ( MsgBody.Type.REQUEST == bodyType ) {
handleRequest(aMsg);
} else if ( MsgBody.Type.RESPONSE == bodyType ) {
handleResponse(aMsg);
}
}
/**
* Handles a request Msg.
*
* @param aMsg Request Msg to be handled.
*/
protected abstract void handleRequest(Msg aMsg);
/**
* Handles a response Msg.
*
* @param aMsg Response to be handled.
*/
protected void handleResponse(Msg aMsg) {
MsgBody body = aMsg.getBody();
MsgHeader header = aMsg.getHeader();
CommandResult result;
result = (CommandResult) body.getContent();
sender.setResponse(
header.getHeader(MsgHeaderConstants.CORRELATION_ID),
result);
}
public void setGBeanContext(GBeanContext context) {
}
public void doStart() throws WaitingException, Exception {
node.addConnector(this);
}
public void doStop() throws WaitingException, Exception {
node.removeConnector(this);
}
public void doFail() {
node.removeConnector(this);
}
public static final GBeanInfo GBEAN_INFO;
static {
GBeanInfoFactory infoFactory = new GBeanInfoFactory("Abstract
Connector", AbstractConnector.class.getName());
infoFactory.addAttribute(new GAttributeInfo("Name", true));
infoFactory.addAttribute(new GAttributeInfo("Context", false));
infoFactory.addReference("Node", Node.class);
infoFactory.addOperation("deliver", new Class[] {Msg.class});
infoFactory.setConstructor(new GConstructorInfo(
Arrays.asList(new Object[]{"Node"}),
Arrays.asList(new Object[]{Node.class})));
GBEAN_INFO = infoFactory.getBeanInfo();
}
public static GBeanInfo getGBeanInfo() {
return GBEAN_INFO;
}
}
1.1
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Node.java
Index: Node.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
import java.io.IOException;
/**
*
* @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:05 $
*/
public interface Node {
/**
* Gets the NodeInfo of this node.
*
* @return NodeInfo.
*/
public NodeInfo getNodeInfo();
/**
* Sets the node topology in which this instance is operating.
*
* @param aTopology Topology of the nodes constituting the network layout.
*/
public void setTopology(Topology aTopology);
/**
* Joins the node uniquely identified on the network by aNodeInfo.
*
* @param aNodeInfo NodeInfo of a remote node to join.
* @throws IOException Indicates that an I/O error has occured.
* @throws CommunicationException Indicates that the node can not be
* registered by the remote node identified by aNodeInfo.
*/
public void join(NodeInfo aNodeInfo)
throws IOException, CommunicationException;
/**
* Leaves the node uniquely identified on the network by aNodeInfo.
*
* @param aNodeInfo NodeInfo of the remote node to leave.
* @throws IOException Indicates that an I/O error has occured.
* @throws CommunicationException Indicates that the node has not leaved
* successfully the remote node.
*/
public void leave(NodeInfo aNodeInfo)
throws IOException, CommunicationException;
/**
* Gets the StreamManager of this node.
*
* @return StreamManager used by this server to resolve/encode
InputStreams.
*/
public StreamManager getStreamManager();
/**
* Registers a new Connector.
*
* @param aConnector Connector to be registered.
*/
public void addConnector(Connector aConnector);
/**
* Unregisters the Connector.
*
* @param aConnector Connector to be deregistered.
*/
public void removeConnector(Connector aConnector);
}
1.1
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeContext.java
Index: NodeContext.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
/**
* Context provided by a Node to its Connectors when they are registered.
*
* @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:05 $
*/
public class NodeContext {
private final MsgOutInterceptor out;
private final RequestSender sender;
public NodeContext(MsgOutInterceptor anOut, RequestSender aSender) {
out = anOut;
sender = aSender;
}
/**
* Gets the Msg outbound interceptor to be used to contact remote
* Connectors.
*/
public MsgOutInterceptor getOutput() {
return out;
}
/**
* Gets the RequestSender to be used to send requests to remote
Connectors.
*/
public RequestSender getRequestSender() {
return sender;
}
}
1.1
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeImpl.java
Index: NodeImpl.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.gbean.GBean;
import org.apache.geronimo.gbean.GBeanContext;
import org.apache.geronimo.gbean.GBeanInfo;
import org.apache.geronimo.gbean.GBeanInfoFactory;
import org.apache.geronimo.gbean.WaitingException;
import org.mortbay.util.ThreadedServer;
/**
* It allows a remote connectivity to a set of Connectors.
* <BR>
* It is the only component dealing directly with raw connections: it directly
* accesses the InputStream and OutputStream of the registered connections. It
* insulates the other components from connectivity issues.
* <BR>
* It is also in charge of dispatching the incoming Msgs to the registered
* Connectors.
* <BR>
* The following diagram shows how ServantNode and Connectors are combined
* together:
*
* Connector -- MTO -- Node -- MTM -- Node -- OTM -- Connector
*
* Connector communicates with each other by sending Msgs.
*
* @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:05 $
*/
public class NodeImpl
implements Node, GBean
{
private static final Log log = LogFactory.getLog(NodeImpl.class);
private static final NodeContext NULL_CONTEXT = new NodeContext(null,
null);
/**
* Node meta-data.
*/
private final NodeInfo nodeInfo;
/**
* Connectors registered by this server.
*/
private final Map connectors;
/**
* StreamManager to register/retrieve distributed InputStream.
*/
private final StreamManager streamManager;
/**
* Server listening for connections to be made.
*/
private final InternalServer server;
/**
* Inbound Msg queue. This queue is filled by Msgs coming directly
* from the network connections.
*/
final MsgQueue queueIn;
/**
* Inbound Msgs reactor. it is between the inbound Msg queue and
* the Connectors.
*/
final HeaderReactor inReactor;
/**
* Outbound Msgs queue. This queue is a staging repository for Nsgs
* to be sent over the network.
*/
final MsgQueue queueOut;
/**
* Connections to this server. The key is the name of the ServantNode and
* the value is a ConnectionWrapper.
*/
final Map connections;
/**
* Processors of this server.
*/
final NodeProcessors processors;
/**
* MetaConnection to other nodes.
*/
final MetaConnection metaConnection;
private final RequestSender sender;
private GBeanContext context;
/**
* Creates a server.
*
* @param aNodeInfo NodeInfo identifying uniquely this node on the
network.
*/
public NodeImpl(NodeInfo aNodeInfo) {
if ( null == aNodeInfo ) {
throw new IllegalArgumentException("NodeInfo is required.");
}
nodeInfo = aNodeInfo;
sender = new RequestSender(nodeInfo);
server = new InternalServer();
metaConnection = new MetaConnection(this);
streamManager = new StreamManagerImpl(this, nodeInfo);
processors = new NodeProcessors(this);
queueIn = new MsgQueue(nodeInfo.getName() + " Inbound");
queueOut = new MsgQueue(nodeInfo.getName() + " Outbound");
connections = new HashMap();
// The incoming messages are dispatched to the clients.
inReactor = new HeaderReactor(
new HeaderInInterceptor(
new QueueInInterceptor(queueIn),
MsgHeaderConstants.DEST_CONNECTOR),
processors.getProcessors());
inReactor.register(StreamManager.NAME, streamManager);
// The stream manager writes to the output queue of the server.
NodeContext nodeContext = new NodeContext(
new HeaderOutInterceptor(
MsgHeaderConstants.SRC_CONNECTOR,
StreamManager.NAME,
new QueueOutInterceptor(queueOut)),
sender
);
streamManager.setContext(nodeContext);
connectors = new HashMap();
}
/**
* Gets the NodeInfo of this node.
*
* @return NodeInfo.
*/
public NodeInfo getNodeInfo() {
return nodeInfo;
}
/**
* Sets the node topology in which this instance is operating.
*
* @param aTopology Topology of the nodes constituting the network layout.
*/
public void setTopology(Topology aTopology) {
metaConnection.setTopology(aTopology);
}
/**
* Joins the node uniquely identified on the network by aNodeInfo.
*
* @param aNodeInfo NodeInfo of a remote node to join.
* @throws IOException Indicates that an I/O error has occured.
* @throws CommunicationException Indicates that the node can not be
* registered by the remote node identified by aNodeInfo.
*/
public void join(NodeInfo aNodeInfo)
throws IOException, CommunicationException {
metaConnection.join(aNodeInfo);
}
/**
* Leaves the node uniquely identified on the network by aNodeInfo.
*
* @param aNodeInfo NodeInfo of the remote node to leave.
* @throws IOException Indicates that an I/O error has occured.
* @throws CommunicationException Indicates that the node has not leaved
* successfully the remote node.
*/
public void leave(NodeInfo aNodeInfo)
throws IOException, CommunicationException {
metaConnection.leave(aNodeInfo);
}
/**
* Gets the StreamManager of this server.
*
* @return StreamManager used by this server to resolve/encode
InputStreams.
*/
public StreamManager getStreamManager() {
return streamManager;
}
/**
* Gets the Output to be used to communicate with the specified node.
* <BR>
* aNode must be a node directly connected to this instance.
*
* @param aNode Node.
* @return Output to be used to communicate with the specified node.
* @throws CommunicationException
*/
public MsgOutInterceptor getRawOutForNode(NodeInfo aNode)
throws CommunicationException {
return metaConnection.getRawOutForNode(aNode);
}
/**
* Gets the Output to be used to communicate with the specified node.
* <BR>
* aNode can be a node anywhere in the topology.
*
* @param aNode Node.
* @return Output to be used to communicate with the specified node.
*/
protected MsgOutInterceptor getOutForNode(NodeInfo aNode)
throws CommunicationException {
return metaConnection.getOutForNode(aNode);
}
/**
* Registers a new Connector.
*
* @param aConnector Connector to be registered.
*/
public void addConnector(Connector aConnector) {
String pName = aConnector.getName();
// Connectors write to the outbound Msg queue.
NodeContext nodeContext = new NodeContext(
new HeaderOutInterceptor(
MsgHeaderConstants.SRC_CONNECTOR,
pName,
new QueueOutInterceptor(queueOut)),
sender);
aConnector.setContext(nodeContext);
inReactor.register(pName, aConnector);
synchronized (connectors) {
connectors.put(pName, aConnector);
}
}
/**
* Unregisters the Connector.
*
* @param aConnector Connector to be deregistered.
*/
public void removeConnector(Connector aConnector) {
String pName = aConnector.getName();
aConnector.setContext(NULL_CONTEXT);
inReactor.unregister(pName);
synchronized (connectors) {
connectors.remove(pName);
}
}
public void setGBeanContext(GBeanContext aContext) {
context = aContext;
}
public void doStart() throws WaitingException, Exception {
server.start();
processors.start();
}
public void doStop() throws WaitingException, Exception {
server.stop();
metaConnection.stop();
processors.stop();
}
public void doFail() {
server.stop();
metaConnection.stop();
processors.stop();
}
public String toString() {
return "Node {" + nodeInfo + "}";
}
/**
* Socket server listening for connections to be made to this node.
*/
private class InternalServer extends ThreadedServer {
public InternalServer() {
super(nodeInfo.getAddress(), nodeInfo.getPort());
// No socket timeout.
setMaxIdleTimeMs(0);
}
/**
* Handles a new connection.
*/
protected void handleConnection(InputStream anIn,OutputStream anOut) {
try {
metaConnection.joined(anIn, anOut);
} catch (IOException e) {
log.error(e);
} catch (CommunicationException e) {
log.error(e);
}
}
public void start() {
try {
super.start();
} catch (Exception e) {
log.error(e);
context.fail();
}
}
public void stop() {
try {
super.stop();
} catch (InterruptedException e) {
log.error(e);
context.fail();
}
}
public void fail() {
try {
super.stop();
} catch (InterruptedException e) {
log.error(e);
}
}
}
public static final GBeanInfo GBEAN_INFO;
static {
GBeanInfoFactory factory = new GBeanInfoFactory(NodeImpl.class);
factory.setConstructor(
new String[] {"NodeInfo"},
new Class[] {NodeInfo.class});
factory.addAttribute("NodeInfo", true);
factory.addAttribute("Topology", true);
factory.addAttribute("StreamManager", false);
factory.addOperation("join", new Class[]{NodeInfo.class});
factory.addOperation("leave", new Class[]{NodeInfo.class});
factory.addOperation("addConnector", new Class[]{Connector.class});
factory.addOperation("removeConnector", new Class[]{Connector.class});
GBEAN_INFO = factory.getBeanInfo();
}
public static GBeanInfo getGBeanInfo() {
return GBEAN_INFO;
}
}
1.1
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeProcessors.java
Index: NodeProcessors.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Processors associated to a server.
*
* @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:05 $
*/
class NodeProcessors
{
private static final Log log = LogFactory.getLog(NodeProcessors.class);
/**
* Server owning these processors.
*/
private final NodeImpl server;
/**
* StreamManager used by the server to resolve InputStreams.
*/
private final StreamManager streamManager;
/**
* Processor pool.
*/
private final Processors processors;
/**
* Creates processors for the provided server.
*
* @param aServer Server owning these processors.
*/
public NodeProcessors(NodeImpl aServer) {
server = aServer;
processors = new Processors(aServer.getNodeInfo().getName(), 2, 10);
streamManager = aServer.getStreamManager();
}
/**
* Execute a Processor in a separate Thread.
*
* @param aProcessor Processor to be executed.
*/
public void execute(Processor aProcessor) {
processors.execute(aProcessor);
}
public Processors getProcessors() {
return processors;
}
/**
* Dispatches the Msgs seating in the inbound queue. Pushes the Msg
seating
* in the outbound queue to the relevant node.
*/
public void start() {
processors.execute(server.inReactor);
processors.execute(new OutputQueueDispatcher());
}
public void stop() {
processors.stop();
}
/**
* Runnable in charge of dispatching the Msgs seating in the outbound
* queue to the relevant node.
*/
private class OutputQueueDispatcher implements Processor {
public void run() {
HeaderInInterceptor in =
new HeaderInInterceptor(
new QueueInInterceptor(server.queueOut),
MsgHeaderConstants.DEST_NODES);
while ( true ) {
Msg msg;
try {
msg = in.pop();
} catch (MsgInterceptorStoppedException e) {
log.info("Stopping OutputQueueDispatcher", e);
return;
}
Object destNode = in.getHeader();
MsgOutInterceptor out;
if ( destNode instanceof NodeInfo ) {
destNode = new NodeInfo[] {(NodeInfo) destNode};
}
NodeInfo[] dests = (NodeInfo[]) destNode;
for (int i = 0; i < dests.length; i++) {
NodeInfo target = dests[i];
Msg msg2 = new Msg(msg);
MsgHeader header = msg2.getHeader();
// A path is defined if this Msg is routed via the node
// owning this instance.
NodeInfo[] path = (NodeInfo[])
header.getOptionalHeader(MsgHeaderConstants.DEST_NODE_PATH);
try {
if ( null != path ) {
target = path[0];
header.addHeader(MsgHeaderConstants.DEST_NODE_PATH,
NodeInfo.pop(path));
out = server.getRawOutForNode(target);
} else {
out = server.getOutForNode(target);
}
} catch (CommunicationException e) {
log.error(e);
continue;
}
out.push(msg2);
}
}
}
}
}
1.3 +8 -261
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationMember.java
Index: ReplicationMember.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationMember.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- ReplicationMember.java 11 Mar 2004 15:36:14 -0000 1.2
+++ ReplicationMember.java 24 Mar 2004 11:37:06 -0000 1.3
@@ -17,115 +17,16 @@
package org.apache.geronimo.datastore.impl.remote.replication;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.geronimo.datastore.impl.remote.messaging.CommandRequest;
-import org.apache.geronimo.datastore.impl.remote.messaging.CommandResult;
import org.apache.geronimo.datastore.impl.remote.messaging.Connector;
-import
org.apache.geronimo.datastore.impl.remote.messaging.HeaderOutInterceptor;
-import org.apache.geronimo.datastore.impl.remote.messaging.Msg;
-import org.apache.geronimo.datastore.impl.remote.messaging.MsgBody;
-import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeader;
-import
org.apache.geronimo.datastore.impl.remote.messaging.MsgHeaderConstants;
-import org.apache.geronimo.datastore.impl.remote.messaging.MsgOutInterceptor;
-import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
-import org.apache.geronimo.datastore.impl.remote.messaging.RequestSender;
-import org.apache.geronimo.datastore.impl.remote.messaging.ServerNodeContext;
-import org.apache.geronimo.gbean.GBean;
-import org.apache.geronimo.gbean.GBeanContext;
-import org.apache.geronimo.gbean.WaitingException;
/**
- * A replication group member.
- * <BR>
- * This is a Connector in charge of replicating the state of registered
- * ReplicantCapables across N-nodes, which constitute a replication group.
- * <BR>
- * Replication members are organized as follow:
- * <pre>
- * ReplicationMember -- MTO -- ServerNode -- MTM -- ServerNode -- OTM --
ReplicationMember
- * </pre>
*
* @version $Revision$ $Date$
*/
-public class ReplicationMember
- implements UpdateListener, Connector, GBean
-{
-
- /**
- * Name of the replication group.
- */
- private final String name;
-
- /**
- * ReplicantID to ReplicantCapable Map.
- */
- private final Map idToReplicant;
-
- /**
- * Nodes hosting the other members of the replication group
- * of this member.
- */
- private NodeInfo[] targetNodes;
+public interface ReplicationMember
+ extends UpdateListener, Connector {
/**
- * Context of the ServerNode which has mounted this instance.
- */
- protected ServerNodeContext serverNodeContext;
-
- /**
- * Output to be used to send requests.
- */
- private MsgOutInterceptor requestOut;
-
- /**
- * Output to be used to send results.
- */
- private MsgOutInterceptor resultOut;
-
- /**
- * Requests sender.
- */
- private RequestSender sender;
-
- /**
- * Creates a replication group member.
- *
- * @param aName Name of the replication group owning this member.
- * @param aTargetNodes Nodes hosting the other members of the
- * replication group containing this member.
- */
- public ReplicationMember(String aName, NodeInfo[] aTargetNodes) {
- if ( null == aName ) {
- throw new IllegalArgumentException("Name is required");
- } else if ( null == aTargetNodes ) {
- throw new IllegalArgumentException("Node names is required");
- }
- name = aName;
- targetNodes = aTargetNodes;
- idToReplicant = new HashMap();
- }
-
- public String getName() {
- return name;
- }
-
- public void fireUpdateEvent(UpdateEvent anEvent) {
- // One does not send the actual ReplicantCapable in the case of an
- // update. Instead, one sends only its identifier.
- ReplicationCapable target = (ReplicationCapable) anEvent.getTarget();
- anEvent.setTarget(target.getID());
- sender.sendSyncRequest(
- new CommandRequest("mergeWithUpdate", new Object[] {anEvent}),
- requestOut, targetNodes);
- }
-
- /**
* Merges an UpdateEvent with a registered ReplicationCapable.
*
* @param anEvent Update event to be merged.
@@ -133,18 +34,7 @@
* performed.
*/
public void mergeWithUpdate(UpdateEvent anEvent)
- throws ReplicationException {
- ReplicantID id = (ReplicantID) anEvent.getTarget();
- ReplicationCapable replicationCapable;
- synchronized(idToReplicant) {
- replicationCapable = (ReplicationCapable) idToReplicant.get(id);
- }
- if ( null == replicationCapable ) {
- throw new ReplicationException(
- "No ReplicantCapable with the id {" + id + "}");
- }
- replicationCapable.mergeWithUpdate(anEvent);
- }
+ throws ReplicationException;
/**
* Registers a ReplicantCapable. From now, UpdateEvents multicasted
@@ -153,18 +43,7 @@
*
* @param aReplicant ReplicantCapable to be controlled by this group.
*/
- public void registerReplicantCapable(ReplicationCapable aReplicant) {
- ReplicantID id = new ReplicantID();
- aReplicant.setID(id);
- sender.sendSyncRequest(
- new CommandRequest("registerLocalReplicantCapable",
- new Object[] {aReplicant}),
- requestOut, targetNodes);
- synchronized(idToReplicant) {
- idToReplicant.put(id, aReplicant);
- aReplicant.addUpdateListener(this);
- }
- }
+ public void registerReplicantCapable(ReplicationCapable aReplicant);
/**
* This method is for internal use only.
@@ -174,12 +53,7 @@
*
* @param aReplicant ReplicantCapable to be locally registered.
*/
- public void registerLocalReplicantCapable(ReplicationCapable aReplicant)
{
- synchronized(idToReplicant) {
- aReplicant.addUpdateListener(this);
- idToReplicant.put(aReplicant.getID(), aReplicant);
- }
- }
+ public void registerLocalReplicantCapable(ReplicationCapable aReplicant);
/**
* Retrieves the ReplicationCapable having the specified id.
@@ -188,133 +62,6 @@
* @return ReplicantCapable having the specified id or null if such an
* identifier is not known.
*/
- public ReplicationCapable retrieveReplicantCapable(Object anID) {
- synchronized(idToReplicant) {
- return (ReplicationCapable) idToReplicant.get(anID);
- }
- }
-
- public void setContext(ServerNodeContext aContext) {
- serverNodeContext = aContext;
- sender = aContext.getRequestSender();
- MsgOutInterceptor out = aContext.getOutput();
- if ( null != out ) {
- out =
- new HeaderOutInterceptor(
- MsgHeaderConstants.DEST_CONNECTOR,
- name,
- out);
- requestOut =
- new HeaderOutInterceptor(
- MsgHeaderConstants.BODY_TYPE,
- MsgBody.Type.REQUEST,
- out);
- resultOut =
- new HeaderOutInterceptor(
- MsgHeaderConstants.BODY_TYPE,
- MsgBody.Type.RESPONSE,
- out);
- } else {
- requestOut = null;
- resultOut = null;
- }
- }
-
- public void deliver(Msg aMsg) {
- MsgHeader header = aMsg.getHeader();
- MsgBody.Type bodyType =
- (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
- if ( bodyType.equals(MsgBody.Type.REQUEST) ) {
- handleRequest(aMsg);
- } else if ( bodyType.equals(MsgBody.Type.RESPONSE) ) {
- handleResponse(aMsg);
- }
- }
-
- /**
- * Handles a request Msg.
- *
- * @param aMsg Request Msg to be handled.
- */
- protected void handleRequest(Msg aMsg) {
- MsgBody body = aMsg.getBody();
- MsgHeader header = aMsg.getHeader();
- Object sourceNode = header.getHeader(MsgHeaderConstants.SRC_NODE);
- Object id = header.getHeader(MsgHeaderConstants.CORRELATION_ID);
- CommandRequest command;
- String gateway;
- command = (CommandRequest) body.getContent();
- command.setTarget(this);
- CommandResult result = command.execute();
- Msg msg = new Msg();
- body = msg.getBody();
- body.setContent(result);
- MsgOutInterceptor reqOut =
- new HeaderOutInterceptor(
- MsgHeaderConstants.CORRELATION_ID,
- id,
- new HeaderOutInterceptor(
- MsgHeaderConstants.DEST_NODES,
- targetNodes,
- new HeaderOutInterceptor(
- MsgHeaderConstants.DEST_CONNECTOR,
- name,
- resultOut)));
- reqOut.push(msg);
- }
-
- /**
- * Handles a response Msg.
- *
- * @param aMsg Response to be handled.
- */
- protected void handleResponse(Msg aMsg) {
- MsgBody body = aMsg.getBody();
- MsgHeader header = aMsg.getHeader();
- CommandResult result;
- result = (CommandResult) body.getContent();
- sender.setResponse(
- header.getHeader(MsgHeaderConstants.CORRELATION_ID),
- result);
- }
+ public ReplicationCapable retrieveReplicantCapable(Object anID);
- public void setGBeanContext(GBeanContext context) {
- }
-
- public void doStart() throws WaitingException, Exception {
- }
-
- public void doStop() throws WaitingException, Exception {
- }
-
- public void doFail() {
- }
-
- /**
- * ReplicantCapable identifier.
- */
- private static class ReplicantID implements Externalizable {
- private static volatile int seqId = 0;
- private int id;
- public ReplicantID() {
- id = seqId++;
- }
- public int hashCode() {
- return id;
- }
- public boolean equals(Object obj) {
- if ( false == obj instanceof ReplicantID ) {
- return false;
- }
- ReplicantID replicantID = (ReplicantID) obj;
- return id == replicantID.id;
- }
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(id);
- }
- public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- id = in.readInt();
- }
- }
-
-}
+}
\ No newline at end of file
1.1
incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationMemberImpl.java
Index: ReplicationMemberImpl.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.replication;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.geronimo.datastore.impl.remote.messaging.AbstractConnector;
import org.apache.geronimo.datastore.impl.remote.messaging.CommandRequest;
import org.apache.geronimo.datastore.impl.remote.messaging.CommandResult;
import
org.apache.geronimo.datastore.impl.remote.messaging.HeaderOutInterceptor;
import org.apache.geronimo.datastore.impl.remote.messaging.Msg;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgBody;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeader;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeaderConstants;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgOutInterceptor;
import org.apache.geronimo.datastore.impl.remote.messaging.Node;
import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
import org.apache.geronimo.datastore.impl.remote.messaging.NodeContext;
import org.apache.geronimo.gbean.GAttributeInfo;
import org.apache.geronimo.gbean.GBean;
import org.apache.geronimo.gbean.GBeanInfo;
import org.apache.geronimo.gbean.GBeanInfoFactory;
import org.apache.geronimo.gbean.GConstructorInfo;
/**
* A replication group member.
* <BR>
* This is a Connector in charge of replicating the state of registered
* ReplicantCapables across N-nodes, which constitute a replication group.
* <BR>
* Replication members are organized as follow:
* <pre>
* ReplicationMember -- MTO -- ServerNode -- MTM -- ServerNode -- OTM --
ReplicationMember
* </pre>
*
* @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:06 $
*/
public class ReplicationMemberImpl
extends AbstractConnector
implements ReplicationMember, GBean
{
/**
* Name of the replication group.
*/
private final String name;
/**
* ReplicantID to ReplicantCapable Map.
*/
private final Map idToReplicant;
/**
* Nodes hosting the other members of the replication group
* of this member.
*/
private NodeInfo[] targetNodes;
/**
* Output to be used to send requests.
*/
private MsgOutInterceptor requestOut;
/**
* Output to be used to send results.
*/
private MsgOutInterceptor resultOut;
/**
* Creates a replication group member.
*
* @param aNode Node containing this instance.
* @param aName Name of the replication group owning this member.
* @param aTargetNodes Nodes hosting the other members of the
* replication group containing this member.
*/
public ReplicationMemberImpl(Node aNode,
String aName, NodeInfo[] aTargetNodes) {
super(aNode);
if ( null == aName ) {
throw new IllegalArgumentException("Name is required");
} else if ( null == aTargetNodes ) {
throw new IllegalArgumentException("Node names is required");
}
name = aName;
targetNodes = aTargetNodes;
idToReplicant = new HashMap();
}
public String getName() {
return name;
}
public void fireUpdateEvent(UpdateEvent anEvent) {
// One does not send the actual ReplicantCapable in the case of an
// update. Instead, one sends only its identifier.
ReplicationCapable target = (ReplicationCapable) anEvent.getTarget();
anEvent.setTarget(target.getID());
sender.sendSyncRequest(
new CommandRequest("mergeWithUpdate", new Object[] {anEvent}),
requestOut, targetNodes);
}
/**
* Merges an UpdateEvent with a registered ReplicationCapable.
*
* @param anEvent Update event to be merged.
* @throws ReplicationException Indicates that the merge can not be
* performed.
*/
public void mergeWithUpdate(UpdateEvent anEvent)
throws ReplicationException {
ReplicantID id = (ReplicantID) anEvent.getTarget();
ReplicationCapable replicationCapable;
synchronized(idToReplicant) {
replicationCapable = (ReplicationCapable) idToReplicant.get(id);
}
if ( null == replicationCapable ) {
throw new ReplicationException(
"No ReplicantCapable with the id {" + id + "}");
}
replicationCapable.mergeWithUpdate(anEvent);
}
/**
* Registers a ReplicantCapable. From now, UpdateEvents multicasted
* by the provided ReplicantCapable are also pushed to the replication
* group.
*
* @param aReplicant ReplicantCapable to be controlled by this group.
*/
public void registerReplicantCapable(ReplicationCapable aReplicant) {
ReplicantID id = new ReplicantID();
aReplicant.setID(id);
sender.sendSyncRequest(
new CommandRequest("registerLocalReplicantCapable",
new Object[] {aReplicant}),
requestOut, targetNodes);
synchronized(idToReplicant) {
idToReplicant.put(id, aReplicant);
aReplicant.addUpdateListener(this);
}
}
/**
* This method is for internal use only.
* <BR>
* It registers with this member a ReplicationCapable, which has been
* registered by a remote member.
*
* @param aReplicant ReplicantCapable to be locally registered.
*/
public void registerLocalReplicantCapable(ReplicationCapable aReplicant) {
synchronized(idToReplicant) {
aReplicant.addUpdateListener(this);
idToReplicant.put(aReplicant.getID(), aReplicant);
}
}
/**
* Retrieves the ReplicationCapable having the specified id.
*
* @param anID Replicant identifier.
* @return ReplicantCapable having the specified id or null if such an
* identifier is not known.
*/
public ReplicationCapable retrieveReplicantCapable(Object anID) {
synchronized(idToReplicant) {
return (ReplicationCapable) idToReplicant.get(anID);
}
}
public void setContext(NodeContext aContext) {
super.setContext(aContext);
if ( null != out ) {
out =
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_CONNECTOR,
name,
out);
requestOut =
new HeaderOutInterceptor(
MsgHeaderConstants.BODY_TYPE,
MsgBody.Type.REQUEST,
out);
resultOut =
new HeaderOutInterceptor(
MsgHeaderConstants.BODY_TYPE,
MsgBody.Type.RESPONSE,
out);
} else {
requestOut = null;
resultOut = null;
}
}
protected void handleRequest(Msg aMsg) {
MsgBody body = aMsg.getBody();
MsgHeader header = aMsg.getHeader();
Object sourceNode = header.getHeader(MsgHeaderConstants.SRC_NODE);
Object id = header.getHeader(MsgHeaderConstants.CORRELATION_ID);
CommandRequest command;
String gateway;
command = (CommandRequest) body.getContent();
command.setTarget(this);
CommandResult result = command.execute();
Msg msg = new Msg();
body = msg.getBody();
body.setContent(result);
MsgOutInterceptor reqOut =
new HeaderOutInterceptor(
MsgHeaderConstants.CORRELATION_ID,
id,
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_NODES,
targetNodes,
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_CONNECTOR,
name,
resultOut)));
reqOut.push(msg);
}
/**
* ReplicantCapable identifier.
*/
private static class ReplicantID implements Externalizable {
private static volatile int seqId = 0;
private int id;
public ReplicantID() {
id = seqId++;
}
public int hashCode() {
return id;
}
public boolean equals(Object obj) {
if ( false == obj instanceof ReplicantID ) {
return false;
}
ReplicantID replicantID = (ReplicantID) obj;
return id == replicantID.id;
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(id);
}
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
id = in.readInt();
}
}
public static final GBeanInfo GBEAN_INFO;
static {
GBeanInfoFactory infoFactory = new GBeanInfoFactory("Replication
Member", ReplicationMemberImpl.class.getName(), AbstractConnector.GBEAN_INFO);
infoFactory.addAttribute(new GAttributeInfo("TargetNodes", true));
infoFactory.addOperation("registerReplicantCapable", new Class[]
{ReplicationCapable.class});
infoFactory.addOperation("retrieveReplicantCapable", new Class[]
{Object.class});
infoFactory.setConstructor(new GConstructorInfo(
Arrays.asList(new Object[]{"Node", "Name", "TargetNodes"}),
Arrays.asList(new Object[]{Node.class, String.class,
NodeInfo[].class})));
GBEAN_INFO = infoFactory.getBeanInfo();
}
public static GBeanInfo getGBeanInfo() {
return GBEAN_INFO;
}
}
1.3 +38 -32
incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/DummyConnector.java
Index: DummyConnector.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/DummyConnector.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- DummyConnector.java 18 Mar 2004 12:14:05 -0000 1.2
+++ DummyConnector.java 24 Mar 2004 11:37:06 -0000 1.3
@@ -20,20 +20,31 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.geronimo.gbean.GAttributeInfo;
+import org.apache.geronimo.gbean.GBean;
+import org.apache.geronimo.gbean.GBeanInfo;
+import org.apache.geronimo.gbean.GBeanInfoFactory;
+
/**
*
* @version $Revision$ $Date$
*/
-public class DummyConnector implements Connector {
+public class DummyConnector
+ extends AbstractConnector
+ implements Connector, GBean {
private final String name;
private final List received;
private final NodeInfo[] targetNodes;
- private RequestSender sender;
- private MsgOutInterceptor out;
- protected ServerNodeContext serverNodeContext;
- public DummyConnector(String aName, NodeInfo[] aTargetNodes) {
+ public DummyConnector(Node aNode,
+ String aName, NodeInfo[] aTargetNodes) {
+ super(aNode);
+ if ( null == aName ) {
+ throw new IllegalArgumentException("Name is required.");
+ } else if ( null == aTargetNodes ) {
+ throw new IllegalArgumentException("Target nodes is required.");
+ }
name = aName;
targetNodes = aTargetNodes;
received = new ArrayList();
@@ -55,10 +66,8 @@
out), targetNodes);
}
- public void setContext(ServerNodeContext aContext) {
- serverNodeContext = aContext;
- sender = aContext.getRequestSender();
- out = aContext.getOutput();
+ public void setContext(NodeContext aContext) {
+ super.setContext(aContext);
if ( null != out ) {
out =
new HeaderOutInterceptor(
@@ -72,23 +81,12 @@
return received;
}
- public void deliver(Msg aMsg) {
- MsgHeader header = aMsg.getHeader();
- MsgBody.Type bodyType =
- (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
- if ( MsgBody.Type.REQUEST == bodyType ) {
- handleRequest(aMsg);
- } else if ( MsgBody.Type.RESPONSE == bodyType ) {
- handleResponse(aMsg);
- }
- }
-
- public void handleRequest(Msg aMsg) {
+ protected void handleRequest(Msg aMsg) {
MsgHeader header = aMsg.getHeader();
MsgBody body = aMsg.getBody();
Object id = header.getHeader(MsgHeaderConstants.CORRELATION_ID);
- Object node = header.getHeader(MsgHeaderConstants.SRC_NODE);
+ Object srcNode = header.getHeader(MsgHeaderConstants.SRC_NODE);
received.add(body.getContent());
@@ -104,7 +102,7 @@
MsgBody.Type.RESPONSE,
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_NODES,
- node,
+ srcNode,
new HeaderOutInterceptor(
MsgHeaderConstants.BODY_TYPE,
MsgBody.Type.RESPONSE,
@@ -112,14 +110,22 @@
reqOut.push(msg);
}
- private void handleResponse(Msg aMsg) {
- MsgBody body = aMsg.getBody();
- MsgHeader header = aMsg.getHeader();
- CommandResult result;
- result = (CommandResult) body.getContent();
- sender.setResponse(
- header.getHeader(MsgHeaderConstants.CORRELATION_ID),
- result);
+ public static final GBeanInfo GBEAN_INFO;
+
+ static {
+ GBeanInfoFactory factory = new
GBeanInfoFactory(DummyConnector.class, AbstractConnector.GBEAN_INFO);
+ factory.setConstructor(
+ new String[] {"Node", "Name", "TargetNodes"},
+ new Class[] {Node.class, String.class, NodeInfo[].class});
+ factory.addAttribute(new GAttributeInfo("TargetNodes", true));
+ factory.addAttribute(new GAttributeInfo("Received", false));
+ factory.addOperation("raiseISException");
+ factory.addOperation("sendRawObject", new Class[]{Object.class});
+ GBEAN_INFO = factory.getBeanInfo();
+ }
+
+ public static GBeanInfo getGBeanInfo() {
+ return GBEAN_INFO;
}
}
1.2 +7 -2
incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/CommandRequestTest.java
Index: CommandRequestTest.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/CommandRequestTest.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- CommandRequestTest.java 11 Mar 2004 15:36:14 -0000 1.1
+++ CommandRequestTest.java 24 Mar 2004 11:37:06 -0000 1.2
@@ -17,6 +17,8 @@
package org.apache.geronimo.datastore.impl.remote.messaging;
+import java.net.InetAddress;
+
import junit.framework.TestCase;
/**
@@ -29,7 +31,10 @@
private static final String name = "test";
protected void setUp() throws Exception {
- connector = new DummyConnector(name, new NodeInfo[0]);
+ connector = new DummyConnector(
+ new NodeImpl(
+ new NodeInfo("test", InetAddress.getLocalHost(), 4040)),
+ name, new NodeInfo[0]);
}
public void testExecute0() throws Exception {
1.1
incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/NodeTest.java
Index: NodeTest.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.Collections;
import java.util.List;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.geronimo.datastore.impl.remote.messaging.Topology.NodePath;
import
org.apache.geronimo.datastore.impl.remote.messaging.Topology.PathWeight;
import org.apache.geronimo.gbean.jmx.GBeanMBean;
import org.apache.geronimo.kernel.Kernel;
/**
*
* @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:06 $
*/
public class NodeTest
extends TestCase
{
private Kernel kernel1;
private ObjectName node1Name;
private ObjectName dummy1Node1Name;
private DummyConnector dummy1Node1;
private ObjectName dummy11Node1Name;
private DummyConnector dummy11Node1;
private Kernel kernel2;
private ObjectName node2Name;
private ObjectName dummy1Node2Name;
private DummyConnector dummy1Node2;
private ObjectName dummy11Node2Name;
private DummyConnector dummy11Node2;
private Kernel kernel3;
private ObjectName node3Name;
private Kernel kernel4;
private ObjectName node4Name;
private ObjectName dummy1Node4Name;
private DummyConnector dummy1Node4;
private void loadAndStart(Kernel kernel, ObjectName name, GBeanMBean
instance)
throws Exception {
kernel.loadGBean(name, instance);
kernel.startGBean(name);
}
private void unloadAndStop(Kernel kernel, ObjectName name)
throws Exception {
kernel.stopGBean(name);
kernel.unloadGBean(name);
}
protected void setUp() throws Exception {
InetAddress address = InetAddress.getLocalHost();
NodeInfo nodeInfo1 = new NodeInfo("Node1", address, 8081);
NodeInfo nodeInfo2 = new NodeInfo("Node2", address, 8082);
NodeInfo nodeInfo3 = new NodeInfo("Node3", address, 8083);
NodeInfo nodeInfo4 = new NodeInfo("Node4", address, 8084);
// Set-up the first ServerNode.
kernel1 = new Kernel("test.kernel1", "test");
kernel1.boot();
node1Name = new ObjectName("geronimo.test:role=node1");
GBeanMBean node1 = new GBeanMBean(NodeImpl.GBEAN_INFO);
node1.setAttribute("NodeInfo", nodeInfo1);
dummy1Node1Name = new ObjectName("geronimo.test:name=dummy1");
GBeanMBean dummy1Node1GB = new GBeanMBean(DummyConnector.GBEAN_INFO);
dummy1Node1GB.setReferencePatterns("Node",
Collections.singleton(node1Name));
dummy1Node1GB.setAttribute("Name", "dummy1");
dummy1Node1GB.setAttribute("TargetNodes",
new NodeInfo[] {nodeInfo2, nodeInfo4});
dummy11Node1Name = new ObjectName("geronimo.test:name=dummy11");
GBeanMBean dummy11Node1GB = new GBeanMBean(DummyConnector.GBEAN_INFO);
dummy11Node1GB.setReferencePatterns("Node",
Collections.singleton(node1Name));
dummy11Node1GB.setAttribute("Name", "dummy11");
dummy11Node1GB.setAttribute("TargetNodes", new NodeInfo[]
{nodeInfo2});
loadAndStart(kernel1, node1Name, node1);
loadAndStart(kernel1, dummy1Node1Name, dummy1Node1GB);
loadAndStart(kernel1, dummy11Node1Name, dummy11Node1GB);
dummy1Node1 = (DummyConnector) dummy1Node1GB.getTarget();
dummy11Node1 = (DummyConnector) dummy11Node1GB.getTarget();
// Set-up the second ServerNode.
kernel2 = new Kernel("test.kernel2", "test");
kernel2.boot();
node2Name = new ObjectName("geronimo.test:role=node2");
GBeanMBean node2 = new GBeanMBean(NodeImpl.GBEAN_INFO);
node2.setAttribute("NodeInfo", nodeInfo2);
dummy1Node2Name = new ObjectName("geronimo.test:name=dummy1");
GBeanMBean dummy1Node2GB = new GBeanMBean(DummyConnector.GBEAN_INFO);
dummy1Node2GB.setReferencePatterns("Node",
Collections.singleton(node2Name));
dummy1Node2GB.setAttribute("Name", "dummy1");
dummy1Node2GB.setAttribute("TargetNodes",
new NodeInfo[] {nodeInfo1, nodeInfo4});
dummy11Node2Name = new ObjectName("geronimo.test:name=dummy11");
GBeanMBean dummy11Node2GB = new GBeanMBean(DummyConnector.GBEAN_INFO);
dummy11Node2GB.setReferencePatterns("Node",
Collections.singleton(node2Name));
dummy11Node2GB.setAttribute("Name", "dummy11");
dummy11Node2GB.setAttribute("TargetNodes", new NodeInfo[]
{nodeInfo1});
loadAndStart(kernel2, node2Name, node2);
loadAndStart(kernel2, dummy1Node2Name, dummy1Node2GB);
loadAndStart(kernel2, dummy11Node2Name, dummy11Node2GB);
dummy1Node2 = (DummyConnector) dummy1Node2GB.getTarget();
dummy11Node2 = (DummyConnector) dummy11Node2GB.getTarget();
Node node = (Node) node2.getTarget();
// The second ServerNode joins the first one.
node.join(nodeInfo1);
// Set-up the third ServerNode.
kernel3 = new Kernel("test.kernel3", "test");
kernel3.boot();
node3Name = new ObjectName("geronimo.test:role=node3");
GBeanMBean node3 = new GBeanMBean(NodeImpl.GBEAN_INFO);
node3.setAttribute("NodeInfo", nodeInfo3);
loadAndStart(kernel3, node3Name, node3);
node = (NodeImpl) node3.getTarget();
// The third ServerNode joins the second one.
node.join(nodeInfo2);
// Set-up the fourth ServerNode.
kernel4 = new Kernel("test.kernel4", "test");
kernel4.boot();
node4Name = new ObjectName("geronimo.test:role=node4");
GBeanMBean node4 = new GBeanMBean(NodeImpl.GBEAN_INFO);
dummy1Node4Name = new ObjectName("geronimo.test:name=dummy1");
GBeanMBean dummy1Node4GB = new GBeanMBean(DummyConnector.GBEAN_INFO);
dummy1Node4GB.setReferencePatterns("Node",
Collections.singleton(node4Name));
dummy1Node4GB.setAttribute("Name", "dummy1");
dummy1Node4GB.setAttribute("TargetNodes",
new NodeInfo[] {nodeInfo1, nodeInfo2});
node4.setAttribute("NodeInfo", nodeInfo4);
loadAndStart(kernel4, node4Name, node4);
loadAndStart(kernel4, dummy1Node4Name, dummy1Node4GB);
dummy1Node4 = (DummyConnector) dummy1Node4GB.getTarget();
node = (NodeImpl) node4.getTarget();
// The fourth ServerNode joins the third one.
node.join(nodeInfo3);
// Sets the topology.
Topology topology = new Topology();
PathWeight weight = new PathWeight(10);
NodePath path = new NodePath(nodeInfo1, nodeInfo2, weight, weight);
topology.addPath(path);
path = new NodePath(nodeInfo2, nodeInfo3, weight, weight);
topology.addPath(path);
path = new NodePath(nodeInfo3, nodeInfo4, weight, weight);
topology.addPath(path);
kernel1.setAttribute(node1Name, "Topology", topology);
kernel2.setAttribute(node2Name, "Topology", topology);
kernel3.setAttribute(node3Name, "Topology", topology);
kernel4.setAttribute(node4Name, "Topology", topology);
}
protected void tearDown() throws Exception {
unloadAndStop(kernel1, node1Name);
unloadAndStop(kernel1, dummy1Node1Name);
unloadAndStop(kernel1, dummy11Node1Name);
kernel1.shutdown();
unloadAndStop(kernel2, node2Name);
unloadAndStop(kernel2, dummy1Node2Name);
unloadAndStop(kernel2, dummy11Node2Name);
kernel2.shutdown();
unloadAndStop(kernel3, node3Name);
kernel3.shutdown();
unloadAndStop(kernel4, node4Name);
unloadAndStop(kernel4, dummy1Node4Name);
kernel4.shutdown();
}
public void testMulticast() throws Exception {
dummy1Node1.sendRawObject("Test1");
List list = dummy1Node2.getReceived();
assertEquals(1, list.size());
assertEquals("Test1", list.remove(0));
list = dummy1Node4.getReceived();
assertEquals(1, list.size());
assertEquals("Test1", list.remove(0));
}
public static void main(String[] args) throws Exception {
NodeTest test = new NodeTest();
test.setUp();
while ( true ) {
test.testSendRawPerformance();
}
}
public void testSendRawPerformance() throws Exception {
List list = dummy11Node2.getReceived();
int iter = 1000;
long start = System.currentTimeMillis();
for(int i = 0; i < iter; i++) {
dummy11Node1.sendRawObject(null);
assertEquals(1, list.size());
list.remove(0);
}
long end = System.currentTimeMillis();
System.out.println("#calls={" + iter + "}; Time={" + (end-start) +
"}");
// TODO update when compression is implemented.
assertTrue((end - start) < 3000);
}
public void testInputStreamPerformance() throws Exception {
long nbBytes = 1024 * 1024;
InputStream in = new DummyInputStream(nbBytes);
long baseLine = timeRead(in);
in = new DummyInputStream(nbBytes);
dummy11Node1.sendRawObject(in);
List list = dummy11Node2.getReceived();
assertEquals(1, list.size());
in = (InputStream) list.remove(0);
long time = timeRead(in);
System.out.println("#bytes={" + nbBytes +
"}; Baseline={" + baseLine + "}; Time={" + time + "}");
// TODO update when compression is implemented.
assertTrue(baseLine * 100 > time);
}
private long timeRead(InputStream anIn)
throws Exception {
int read;
long start = System.currentTimeMillis();
while ( -1 != (read = anIn.read() ) ) {}
return System.currentTimeMillis() - start;
}
private static class DummyInputStream extends InputStream {
private final long size;
private long curPos = 0;
private DummyInputStream(long aSize) {
size = aSize;
}
public int read() throws IOException {
if ( curPos++ < size ) {
return 1;
}
return -1;
}
}
}
1.3 +88 -30
incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication/ReplicationTest.java
Index: ReplicationTest.java
===================================================================
RCS file:
/home/cvs/incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication/ReplicationTest.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- ReplicationTest.java 11 Mar 2004 15:36:15 -0000 1.2
+++ ReplicationTest.java 24 Mar 2004 11:37:06 -0000 1.3
@@ -22,60 +22,118 @@
import java.util.HashMap;
import java.util.Map;
+import javax.management.ObjectName;
+
import junit.framework.TestCase;
+import org.apache.geronimo.datastore.impl.remote.messaging.Node;
import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
+import org.apache.geronimo.datastore.impl.remote.messaging.NodeImpl;
import org.apache.geronimo.datastore.impl.remote.messaging.Topology;
-import org.apache.geronimo.datastore.impl.remote.messaging.ServerNode;
import org.apache.geronimo.datastore.impl.remote.messaging.Topology.NodePath;
import
org.apache.geronimo.datastore.impl.remote.messaging.Topology.PathWeight;
+import org.apache.geronimo.gbean.jmx.GBeanMBean;
+import org.apache.geronimo.kernel.Kernel;
/**
*
* @version $Revision$ $Date$
*/
public class ReplicationTest extends TestCase {
-
- SimpleReplicatedMap replicant1;
- ReplicationMember replication1;
- ReplicationMember replication2;
+ private Kernel kernel1;
+ private ObjectName node1Name;
+ private ObjectName repNode1Name;
+ private ReplicationMember repNode1;
+
+ private Kernel kernel2;
+ private ObjectName node2Name;
+ private ObjectName repNode2Name;
+ private ReplicationMember repNode2;
+
+ private void loadAndStart(Kernel kernel, ObjectName name, GBeanMBean
instance)
+ throws Exception {
+ kernel.loadGBean(name, instance);
+ kernel.startGBean(name);
+ }
+
+ private void unloadAndStop(Kernel kernel, ObjectName name)
+ throws Exception {
+ kernel.stopGBean(name);
+ kernel.unloadGBean(name);
+ }
+
protected void setUp() throws Exception {
InetAddress address = InetAddress.getLocalHost();
- NodeInfo nodeInfo1 = new NodeInfo("Node1", address, 8080);
- NodeInfo nodeInfo2 = new NodeInfo("Node2", address, 8082);
+ NodeInfo primaryNode = new NodeInfo("Primary", address, 8080);
+ NodeInfo secondaryNode = new NodeInfo("Secondary", address, 8082);
- replicant1 = new SimpleReplicatedMap();
- replication1 = new ReplicationMember("Replication1",
- new NodeInfo[] {nodeInfo2});
- ServerNode server1 = new ServerNode(nodeInfo1,
- Collections.singleton(replication1));
- server1.doStart();
- replication1.doStart();
-
- replication2 = new ReplicationMember("Replication1",
- new NodeInfo[] {nodeInfo1});
- ServerNode server2 = new ServerNode(nodeInfo2,
- Collections.singleton(replication2));
- server2.doStart();
- replication2.doStart();
+
+ // Set-up the first ServerNode.
+ kernel1 = new Kernel("test.kernel1", "test");
+ kernel1.boot();
+
+ node1Name = new ObjectName("geronimo.test:role=node1");
+ GBeanMBean node1GB = new GBeanMBean(NodeImpl.GBEAN_INFO);
+ node1GB.setAttribute("NodeInfo", primaryNode);
+ repNode1Name = new ObjectName("geronimo.test:role=replication");
+ GBeanMBean repNode1GB = new
GBeanMBean(ReplicationMemberImpl.GBEAN_INFO);
+ repNode1GB.setReferencePatterns("Node",
+ Collections.singleton(node1Name));
+ repNode1GB.setAttribute("Name", "Replication");
+ repNode1GB.setAttribute("TargetNodes", new NodeInfo[]
{secondaryNode});
+ loadAndStart(kernel1, repNode1Name, repNode1GB);
+ loadAndStart(kernel1, node1Name, node1GB);
+ repNode1 = (ReplicationMember) repNode1GB.getTarget();
+
+ // Set-up the second ServerNode.
+ kernel2 = new Kernel("test.kernel2", "test");
+ kernel2.boot();
+
+ node2Name = new ObjectName("geronimo.test:role=node2");
+ GBeanMBean node2GB = new GBeanMBean(NodeImpl.GBEAN_INFO);
+ node2GB.setAttribute("NodeInfo", secondaryNode);
+ repNode2Name = new ObjectName("geronimo.test:role=replication");
+ GBeanMBean repNode2GB = new
GBeanMBean(ReplicationMemberImpl.GBEAN_INFO);
+ repNode2GB.setReferencePatterns("Node",
+ Collections.singleton(node2Name));
+ repNode2GB.setAttribute("Name", "Replication");
+ repNode2GB.setAttribute("TargetNodes", new NodeInfo[] {primaryNode});
+ loadAndStart(kernel2, repNode2Name, repNode2GB);
+ loadAndStart(kernel2, node2Name, node2GB);
+ repNode2 = (ReplicationMember) repNode2GB.getTarget();
- server2.join(nodeInfo1);
+ Node node = (Node) node2GB.getTarget();
+ // The second ServerNode joins the first one.
+ node.join(primaryNode);
+ // Sets the topology.
Topology topology = new Topology();
PathWeight weight = new PathWeight(10);
- NodePath path = new NodePath(nodeInfo1, nodeInfo2, weight, weight);
+ NodePath path = new NodePath(primaryNode, secondaryNode, weight,
weight);
topology.addPath(path);
- server2.setTopology(topology);
- server1.setTopology(topology);
+
+ kernel1.setAttribute(node1Name, "Topology", topology);
+ kernel2.setAttribute(node2Name, "Topology", topology);
}
- public void testUseCase() {
+ protected void tearDown() throws Exception {
+ unloadAndStop(kernel1, repNode1Name);
+ unloadAndStop(kernel1, node1Name);
+ kernel1.shutdown();
+
+ unloadAndStop(kernel2, repNode2Name);
+ unloadAndStop(kernel2, node2Name);
+ kernel2.shutdown();
+ }
+
+ public void testUseCase() throws Exception {
+ SimpleReplicatedMap replicant1 = new SimpleReplicatedMap();
replicant1.put("test1", "value1");
- replication1.registerReplicantCapable(replicant1);
+ repNode1.registerReplicantCapable(replicant1);
Object id = replicant1.getID();
- SimpleReplicatedMap replicant2 =
- (SimpleReplicatedMap) replication2.retrieveReplicantCapable(id);
+ SimpleReplicatedMap replicant2 = (SimpleReplicatedMap)
+ repNode2.retrieveReplicantCapable(id);
assertNotNull("Not been registered", replicant2);
assertEquals("value1", replicant2.get("test1"));
replicant1.put("test2", "value2");