User: norbert
Date: 00/06/18 21:23:15
Modified: src/java/org/spydermq/distributed/server
ConnectionReceiverOIL.java
ConnectionReceiverOILClient.java
ConnectionReceiverRMIImpl.java
DistributedJMSServerRMIImpl.java
Added: src/java/org/spydermq/distributed/server
DistributedJMSServerOIL.java
DistributedJMSServerOILClient.java
Log:
More work for the OIL
Revision Changes Path
1.4 +21 -6
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
Index: ConnectionReceiverOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ConnectionReceiverOIL.java 2000/06/15 23:21:40 1.3
+++ ConnectionReceiverOIL.java 2000/06/19 04:23:14 1.4
@@ -38,10 +38,10 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class ConnectionReceiverOIL
- implements ConnectionReceiver, Runnable
+ implements Runnable
{
// Attributes ----------------------------------------------------
@@ -59,7 +59,7 @@
}
// Internals -----------------------------------------------------
- // Should be hold in an "extention" and the ConnectionReceiver logic should be
in a separated object
+ // Should be hold in an "extension" and the ConnectionReceiver logic should be
in a separated object
// We could generate a dynamic proxy for that...
static final int RECEIVE = 1;
@@ -67,7 +67,7 @@
static final int DELETE_TEMPORARY_DESTINATION = 3;
static final int CLOSE = 4;
- ServerSocket serverSocket;
+ private ServerSocket serverSocket;
void exportObject()
{
@@ -166,8 +166,7 @@
}
public ConnectionReceiver createClient() throws Exception
- {
-
+ {
return new
ConnectionReceiverOILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
}
@@ -178,6 +177,21 @@
this.connection=connection;
}
+ //<DEBUG>
+
+ /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
+ {
+ connection.rec++;
+ }
+
+ public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
+ {
+ connection.rec++;
+ }*/
+
+ //</DEBUG>
+
+
//A message has arrived for this Connection, We have to dispatch it to the
sessions
public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
{
@@ -254,6 +268,7 @@
}
}
+
public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
{
1.5 +1 -4
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
Index: ConnectionReceiverOILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- ConnectionReceiverOILClient.java 2000/06/15 23:27:48 1.4
+++ ConnectionReceiverOILClient.java 2000/06/19 04:23:14 1.5
@@ -56,6 +56,7 @@
public void waitAnswer() throws Exception
{
try {
+ os.flush();
int val=is.read();
if (val==1) {
String st=(String)in.readObject();
@@ -76,7 +77,6 @@
os.write(RECEIVE);
out.writeObject(dest);
out.writeObject(mes);
- os.flush();
waitAnswer();
}
@@ -86,7 +86,6 @@
os.write(RECEIVE_MULTIPLE);
out.writeObject(dest);
out.writeObject(mes);
- os.flush();
waitAnswer();
}
@@ -95,7 +94,6 @@
if (socket==null) createConnection();
os.write(DELETE_TEMPORARY_DESTINATION);
out.writeObject(dest);
- os.flush();
waitAnswer();
}
@@ -103,7 +101,6 @@
{
if (socket==null) createConnection();
os.write(CLOSE);
- os.flush();
waitAnswer();
}
1.10 +16 -1
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
Index: ConnectionReceiverRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- ConnectionReceiverRMIImpl.java 2000/06/15 22:50:31 1.9
+++ ConnectionReceiverRMIImpl.java 2000/06/19 04:23:14 1.10
@@ -30,7 +30,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class ConnectionReceiverRMIImpl
extends UnicastRemoteObject
@@ -57,6 +57,21 @@
{
this.connection=connection;
}
+
+ //<DEBUG>
+
+ /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
+ {
+ connection.rec++;
+ }
+
+ public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
+ {
+ connection.rec++;
+ }*/
+
+ //</DEBUG>
+
//A message has arrived for this Connection, We have to dispatch it to the
sessions
public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
1.3 +16 -10
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java
Index: DistributedJMSServerRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- DistributedJMSServerRMIImpl.java 2000/06/14 23:21:00 1.2
+++ DistributedJMSServerRMIImpl.java 2000/06/19 04:23:15 1.3
@@ -19,15 +19,19 @@
import org.spydermq.Log;
import java.rmi.server.UnicastRemoteObject;
import java.rmi.RemoteException;
+import org.spydermq.distributed.interfaces.DistributedJMSServer;
+import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
/**
* The RMI implementation of the DistributedJMSServer object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
-public class DistributedJMSServerRMIImpl extends UnicastRemoteObject implements
DistributedJMSServerRMI, DistributedJMSServerRMIImplMBean
+public class DistributedJMSServerRMIImpl
+ extends UnicastRemoteObject
+ implements DistributedJMSServerRMI, DistributedJMSServerRMIImplMBean,
DistributedJMSServerSetup
{
// Attributes ----------------------------------------------------
@@ -43,11 +47,6 @@
// Public --------------------------------------------------------
- public void setServer(JMSServer s)
- {
- server=s;
- }
-
public String getID() throws JMSException
{
return server.getID();
@@ -112,10 +111,17 @@
{
server.connectionListening(mode,dest,dc);
}
+
+ // --
+
+ public DistributedJMSServer createClient()
+ {
+ return this;
+ }
- public void finalize()
+ public void setServer(JMSServer s)
{
- Log.log("Distributed.finalize()");
+ server=s;
}
-
+
}
1.1
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java
Index: DistributedJMSServerOIL.java
===================================================================
package org.spydermq.distributed.server;
import javax.jms.JMSException;
import javax.jms.Destination;
import javax.jms.Topic;
import javax.jms.Queue;
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
import org.spydermq.SpyMessage;
import org.spydermq.SpyDestination;
import org.spydermq.JMSServer;
import org.spydermq.SpyDistributedConnection;
import org.spydermq.Log;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
import java.rmi.RemoteException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.InetAddress;
import java.io.ObjectOutputStream;
import java.io.BufferedOutputStream;
import java.io.ObjectInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;
public class DistributedJMSServerOIL
implements Runnable, DistributedJMSServerSetup
{
// Attributes ----------------------------------------------------
//The server implementation
private JMSServer server;
// Constructor ---------------------------------------------------
public DistributedJMSServerOIL()
{
exportObject();
}
// Internals -----------------------------------------------------
static final int GetID = 1;
static final int NewMessage = 2;
static final int Subscribe = 3;
static final int Unsubscribe = 4;
static final int CreateTopic = 5;
static final int CreateQueue = 6;
static final int GetTemporaryTopic = 7;
static final int GetTemporaryQueue = 8;
static final int ConnectionClosing = 9;
static final int DeleteTemporaryDestination = 10;
static final int CheckID = 11;
static final int QueueReceiveNoWait = 12;
static final int ConnectionListening = 13;
private ServerSocket serverSocket;
void exportObject()
{
try {
serverSocket = new ServerSocket(0);
new Thread(this).start();
} catch (IOException e) {
failure("Initialization",e);
}
}
public void run()
{
Socket socket = null;
int code = 0;
InputStream is=null;
OutputStream os=null;
ObjectOutputStream out=null;
ObjectInputStream in=null;
try {
socket = serverSocket.accept();
new Thread(this).start();
is = socket.getInputStream();
os = socket.getOutputStream();
out = new ObjectOutputStream(os);
in = new ObjectInputStream(is);
} catch (IOException e) {
failure("Initialisation",e);
return;
}
while (true) {
try {
code=is.read();
} catch (IOException e) {
failure("Command read",e);
return;
}
try {
Object result=null;
switch (code)
{
case GetID:
result=server.getID();
break;
case NewMessage:
server.newMessage((SpyMessage[])in.readObject(),(String)in.readObject());
break;
case Subscribe:
server.subscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
break;
case Unsubscribe:
server.unsubscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
break;
case CreateTopic:
result=(Topic)server.createTopic((String)in.readObject());
break;
case CreateQueue:
result=(Queue)server.createQueue((String)in.readObject());
break;
case GetTemporaryTopic:
result=(TemporaryTopic)server.getTemporaryTopic((SpyDistributedConnection)in.readObject());
break;
case GetTemporaryQueue:
result=(TemporaryQueue)server.getTemporaryQueue((SpyDistributedConnection)in.readObject());
break;
case ConnectionClosing:
server.connectionClosing((SpyDistributedConnection)in.readObject(),null);
break;
case DeleteTemporaryDestination:
server.deleteTemporaryDestination((SpyDestination)in.readObject());
break;
case CheckID:
server.checkID((String)in.readObject());
break;
case QueueReceiveNoWait:
result=server.queueReceiveNoWait((Queue)in.readObject());
break;
case ConnectionListening:
server.connectionListening(((is.read())==1),(Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
break;
default:
throw new RemoteException("Bad method
code !");
}
//Everthing was OK
try {
if (result==null) os.write(0);
else {
os.write(1);
out.writeObject(result);
}
os.flush();
} catch (IOException e) {
failure("Result write",e);
return;
}
} catch (Exception e) {
try {
os.write(2);
out.writeObject(e.getMessage());
os.flush();
} catch (IOException e2) {
failure("Result write",e2);
return;
}
}
}
}
void failure(String st,Exception e)
{
Log.error("Closing socket: "+st);
Log.error(e);
}
// --
public DistributedJMSServer createClient() throws Exception
{
return new
DistributedJMSServerOILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
}
public void setServer(JMSServer s)
{
server=s;
}
}
1.1
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java
Index: DistributedJMSServerOILClient.java
===================================================================
package org.spydermq.distributed.server;
import javax.jms.JMSException;
import javax.jms.Destination;
import javax.jms.Topic;
import javax.jms.Queue;
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
import org.spydermq.SpyMessage;
import org.spydermq.SpyDestination;
import org.spydermq.JMSServer;
import org.spydermq.Log;
import org.spydermq.SpyDistributedConnection;
import java.rmi.RemoteException;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.Socket;
import java.net.InetAddress;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
public class DistributedJMSServerOILClient
implements DistributedJMSServer, Serializable
{
static final int GetID = 1;
static final int NewMessage = 2;
static final int Subscribe = 3;
static final int Unsubscribe = 4;
static final int CreateTopic = 5;
static final int CreateQueue = 6;
static final int GetTemporaryTopic = 7;
static final int GetTemporaryQueue = 8;
static final int ConnectionClosing = 9;
static final int DeleteTemporaryDestination = 10;
static final int CheckID = 11;
static final int QueueReceiveNoWait = 12;
static final int ConnectionListening = 13;
//Remote stuff
private transient Socket socket;
private transient InputStream is;
private transient OutputStream os;
private transient ObjectOutputStream out;
private transient ObjectInputStream in;
private int port;
private InetAddress addr;
public DistributedJMSServerOILClient(InetAddress addr,int port)
{
socket=null;
this.port=port;
this.addr=addr;
}
void createConnection() throws RemoteException
{
try {
socket=new Socket(addr,port);
is=socket.getInputStream();
os=socket.getOutputStream();
in=new ObjectInputStream(is);
out=new ObjectOutputStream(os);
} catch (Exception e) {
failure(e);
}
}
public Object waitAnswer() throws RemoteException
{
try {
os.flush();
int val=is.read();
if (val==0) return null;
if (val==1) {
return in.readObject();
} else {
String st=(String)in.readObject();
throw new RemoteException(st);
}
} catch (Exception e) {
failure(e);
return null;
}
}
void failure(Exception e) throws RemoteException
{
Log.error(e);
throw new RemoteException("Cannot contact the remote object");
}
//--- Remote Calls
public String getID() throws Exception
{
if (socket==null) createConnection();
try {
os.write(GetID);
} catch (IOException e) {
failure(e);
}
return (String)waitAnswer();
}
public void newMessage(SpyMessage val[],String id) throws JMSException,
RemoteException
{
if (socket==null) createConnection();
try {
os.write(NewMessage);
out.writeObject(val);
out.writeObject(id);
} catch (IOException e) {
failure(e);
}
waitAnswer();
}
public void subscribe(Destination dest,SpyDistributedConnection who) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
os.write(Subscribe);
out.writeObject(dest);
out.writeObject(who);
} catch (IOException e) {
failure(e);
}
waitAnswer();
}
public void unsubscribe(Destination dest,SpyDistributedConnection who) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
os.write(Unsubscribe);
out.writeObject(dest);
out.writeObject(who);
} catch (IOException e) {
failure(e);
}
waitAnswer();
}
public Topic createTopic(String dest) throws JMSException, RemoteException
{
if (socket==null) createConnection();
try {
os.write(CreateTopic);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
}
return (Topic)waitAnswer();
}
public Queue createQueue(String dest) throws JMSException, RemoteException
{
if (socket==null) createConnection();
try {
os.write(CreateQueue);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
}
return (Queue)waitAnswer();
}
public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
os.write(GetTemporaryTopic);
out.writeObject(dc);
} catch (IOException e) {
failure(e);
}
return (TemporaryTopic)waitAnswer();
}
public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
os.write(GetTemporaryQueue);
out.writeObject(dc);
} catch (IOException e) {
failure(e);
}
return (TemporaryQueue)waitAnswer();
}
public void connectionClosing(SpyDistributedConnection dc) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
os.write(ConnectionClosing);
out.writeObject(dc);
} catch (IOException e) {
failure(e);
}
waitAnswer();
}
public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
os.write(DeleteTemporaryDestination);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
}
waitAnswer();
}
public void checkID(String ID) throws JMSException, RemoteException
{
if (socket==null) createConnection();
try {
os.write(CheckID);
out.writeObject(ID);
} catch (IOException e) {
failure(e);
}
waitAnswer();
}
public SpyMessage queueReceiveNoWait(Queue queue) throws Exception,
RemoteException
{
if (socket==null) createConnection();
try {
os.write(QueueReceiveNoWait);
out.writeObject(queue);
} catch (IOException e) {
failure(e);
}
return (SpyMessage)waitAnswer();
}
public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws Exception, RemoteException
{
if (socket==null) createConnection();
try {
os.write(ConnectionListening);
if (mode) os.write(1);
else os.write(0);
out.writeObject(dest);
out.writeObject(dc);
} catch (IOException e) {
failure(e);
}
waitAnswer();
}
//--
public void setServer(JMSServer s) throws Exception
{
//Nothing !
}
}