User: norbert
Date: 00/06/15 15:50:32
Modified: src/java/org/spydermq/distributed/server
ConnectionReceiverOIL.java
ConnectionReceiverOILClient.java
ConnectionReceiverRMIImpl.java
Log:
More work on the OIL
Revision Changes Path
1.2 +39 -18
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
Index: ConnectionReceiverOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ConnectionReceiverOIL.java 2000/06/15 04:10:00 1.1
+++ ConnectionReceiverOIL.java 2000/06/15 22:50:31 1.2
@@ -25,7 +25,9 @@
import java.rmi.RemoteException;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.InetAddress;
import java.io.ObjectOutputStream;
+import java.io.BufferedOutputStream;
import java.io.ObjectInputStream;
import java.io.InputStream;
import java.io.OutputStream;
@@ -36,9 +38,10 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
-public class ConnectionReceiverOIL implements ConnectionReceiver, Runnable
+public class ConnectionReceiverOIL
+ implements ConnectionReceiver, Runnable
{
// Attributes ----------------------------------------------------
@@ -65,15 +68,14 @@
static final int CLOSE = 4;
ServerSocket serverSocket;
-
- public void exportObject()
+
+ void exportObject()
{
try {
- //Should be dynamic...
- serverSocket = new ServerSocket(12345);
+ serverSocket = new ServerSocket(0);
new Thread(this).start();
} catch (IOException e) {
- Log.error("Bug");
+ failure("Initialization",e);
}
}
@@ -88,16 +90,18 @@
try {
socket = serverSocket.accept();
-
- new Thread(this).start();
-
- is=socket.getInputStream();
- os=socket.getOutputStream();
- out = new ObjectOutputStream(os);
+
+ //We have our connection to the broker... there's no need to
wait for another connection
+ //new Thread(this).start();
+
+ is = socket.getInputStream();
+ os = socket.getOutputStream();
+ out = new ObjectOutputStream(os);
in = new ObjectInputStream(is);
} catch (IOException e) {
- Log.error("Bug");
+ failure("Initialisation",e);
+ return;
}
while (true) {
@@ -105,9 +109,10 @@
Log.log("Wait for command");
try {
- code=in.read();
+ code=is.read();
} catch (IOException e) {
- Log.error("Bug");
+ failure("Command read",e);
+ return;
}
try {
@@ -137,8 +142,10 @@
try {
os.write(0);
+ os.flush();
} catch (IOException e) {
- Log.error("Bug");
+ failure("Result write",e);
+ return;
}
} catch (Exception e) {
@@ -147,13 +154,27 @@
try {
os.write(1);
+ os.flush();
} catch (IOException e2) {
- Log.error("Bug");
+ failure("Result write",e2);
+ return;
}
}
}
+ }
+
+ void failure(String st,Exception e)
+ {
+ Log.error(st);
+ Log.error(e);
+ }
+
+ public ConnectionReceiver createClient() throws Exception
+ {
+
+ return new
ConnectionReceiverOILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
}
// Public --------------------------------------------------------
1.2 +42 -14
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
Index: ConnectionReceiverOILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ConnectionReceiverOILClient.java 2000/06/15 04:10:00 1.1
+++ ConnectionReceiverOILClient.java 2000/06/15 22:50:31 1.2
@@ -11,43 +11,56 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;
+import java.io.Serializable;
import java.net.Socket;
+import java.net.InetAddress;
public class ConnectionReceiverOILClient
- implements ConnectionReceiver
+ implements ConnectionReceiver, Serializable
{
static final int RECEIVE = 1;
static final int RECEIVE_MULTIPLE = 2;
static final int DELETE_TEMPORARY_DESTINATION = 3;
static final int CLOSE = 4;
- Socket socket;
- InputStream is;
- OutputStream os;
- ObjectOutputStream out;
- ObjectInputStream in;
+ private transient Socket socket;
+ private transient InputStream is;
+ private transient OutputStream os;
+ private transient ObjectOutputStream out;
+ private transient ObjectInputStream in;
+
+ private int port;
+ private InetAddress addr;
+
+ public ConnectionReceiverOILClient(InetAddress addr,int port)
+ {
+ socket=null;
+ this.port=port;
+ this.addr=addr;
+ }
- ConnectionReceiverOILClient() throws RemoteException
+ void createConnection() throws RemoteException
{
- try {
- Socket socket=new Socket("localhost",12345);
+ try {
+ socket=new Socket(addr,port);
is=socket.getInputStream();
- os=socket.getOutputStream();
+ os=socket.getOutputStream(); //BufferOutputStream instead
in=new ObjectInputStream(is);
out=new ObjectOutputStream(os);
} catch (Exception e) {
Log.error(e);
+ throw new RemoteException("Cannot connect to the
ConnectionReceiver/Server");
}
}
- public void waitAnswer() throws Exception
+ public void waitAnswer() throws RemoteException
{
try {
int val=is.read();
if (val==0) Log.log("Return : OK");
else {
Log.log("Return : Exception");
- throw new Exception("Remote Exception");
+ throw new RemoteException("Remote Exception");
}
} catch (IOException e) {
Log.error("IOException while reading the answer");
@@ -57,37 +70,52 @@
}
public void receive(SpyDestination dest,SpyMessage mes) throws Exception
- {
+ {
+ if (socket==null) createConnection();
os.write(RECEIVE);
out.writeObject(dest);
out.writeObject(mes);
+ os.flush();
waitAnswer();
}
public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
Exception
{
+ if (socket==null) createConnection();
os.write(RECEIVE_MULTIPLE);
out.writeObject(dest);
out.writeObject(mes);
+ os.flush();
waitAnswer();
}
public void deleteTemporaryDestination(SpyDestination dest) throws Exception
{
+ if (socket==null) createConnection();
os.write(DELETE_TEMPORARY_DESTINATION);
out.writeObject(dest);
+ os.flush();
waitAnswer();
}
public void close() throws Exception
{
+ if (socket==null) createConnection();
os.write(CLOSE);
+ os.flush();
waitAnswer();
}
public void setConnection(SpyConnection connection) throws Exception
{
- //SHOULD NOT be there !!!
+ //SHOULD NOT be there !!! - separate the interfaces
}
+
+ public ConnectionReceiver createClient() throws Exception
+ {
+ return null;
+ //SHOULD NOT be there !!! - separate the interfaces
+ }
+
}
1.9 +10 -2
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
Index: ConnectionReceiverRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- ConnectionReceiverRMIImpl.java 2000/06/15 04:02:28 1.8
+++ ConnectionReceiverRMIImpl.java 2000/06/15 22:50:31 1.9
@@ -23,15 +23,18 @@
import java.util.Hashtable;
import java.util.HashSet;
import java.util.Iterator;
+import org.spydermq.distributed.interfaces.ConnectionReceiver;
/**
* The RMI implementation of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
-public class ConnectionReceiverRMIImpl extends UnicastRemoteObject implements
ConnectionReceiverRMI
+public class ConnectionReceiverRMIImpl
+ extends UnicastRemoteObject
+ implements ConnectionReceiverRMI
{
// Attributes ----------------------------------------------------
@@ -152,4 +155,9 @@
connection.deleteTemporaryDestination(dest);
}
+ public ConnectionReceiver createClient() throws JMSException
+ {
+ return this;
+ }
+
}