User: hiram
Date: 00/12/21 14:33:58
Modified: src/java/org/spydermq/distributed/server
ConnectionReceiverOIL.java
ConnectionReceiverRMIImpl.java
ConnectionReceiverUIL.java
Log:
Added ConnectionConsumer so that work on the ASF part of
spyderMQ can start.
Revision Changes Path
1.12 +68 -80
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
Index: ConnectionReceiverOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- ConnectionReceiverOIL.java 2000/12/12 05:58:51 1.11
+++ ConnectionReceiverOIL.java 2000/12/21 22:33:58 1.12
@@ -17,14 +17,14 @@
import org.spydermq.SpyQueueSession;
import org.spydermq.Log;
import org.spydermq.NoReceiverException;
-import org.spydermq.SpyMessageConsumer;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
+import org.spydermq.SpyConsumer;
import java.util.Hashtable;
import java.util.HashSet;
import java.util.Iterator;
-import java.rmi.RemoteException;
+import java.rmi.RemoteException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.InetAddress;
@@ -42,11 +42,9 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
*/
-public class ConnectionReceiverOIL
- implements Runnable, ConnectionReceiverSetup
-{
+public class ConnectionReceiverOIL implements Runnable, ConnectionReceiverSetup {
// Attributes ----------------------------------------------------
//A link on my connection
@@ -55,100 +53,95 @@
private boolean closed;
// Constructor ---------------------------------------------------
-
- public ConnectionReceiverOIL()
- {
- closed=false;
+
+ public ConnectionReceiverOIL() {
+ closed = false;
exportObject();
}
// Internals -----------------------------------------------------
-
+
static final int RECEIVE = 1;
static final int RECEIVE_MULTIPLE = 2;
static final int DELETE_TEMPORARY_DESTINATION = 3;
- static final int CLOSE = 4;
-
+ static final int CLOSE = 4;
+
private ServerSocket serverSocket;
- void exportObject()
- {
+ void exportObject() {
try {
serverSocket = new ServerSocket(0);
Thread thread = new Thread(this, "ConnectionReceiverOIL
Server");
thread.setDaemon(true);
thread.start();
} catch (IOException e) {
- failure("Initialization",e);
+ failure("Initialization", e);
}
}
-
- public void run()
- {
+
+ public void run() {
Socket socket = null;
int code = 0;
- ObjectOutputStream out=null;
- ObjectInputStream in=null;
-
+ ObjectOutputStream out = null;
+ ObjectInputStream in = null;
+
try {
-
+
socket = serverSocket.accept();
out = new ObjectOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
out.flush();
in = new ObjectInputStream(new
BufferedInputStream(socket.getInputStream()));
} catch (IOException e) {
- failure("Initialisation",e);
+ failure("Initialisation", e);
return;
}
while (true) {
try {
- code=in.readByte();
+ code = in.readByte();
} catch (IOException e) {
- failure("Command read",e);
+ failure("Command read", e);
e.printStackTrace();
return;
}
-
+
try {
-
- switch (code)
- {
- case RECEIVE:
-
receive((SpyDestination)in.readObject(),(SpyMessage)in.readObject());
+ switch (code) {
+ case RECEIVE :
+ receive((SpyDestination)
in.readObject(), (SpyMessage) in.readObject());
break;
- case RECEIVE_MULTIPLE:
- SpyDestination
dest=(SpyDestination)in.readObject();
- int nb=in.readInt();
- receiveMultiple(dest,nb,in);
+ case RECEIVE_MULTIPLE :
+ SpyDestination dest = (SpyDestination)
in.readObject();
+ int nb = in.readInt();
+ receiveMultiple(dest, nb, in);
break;
- case DELETE_TEMPORARY_DESTINATION:
-
deleteTemporaryDestination((SpyDestination)in.readObject());
+ case DELETE_TEMPORARY_DESTINATION :
+
deleteTemporaryDestination((SpyDestination) in.readObject());
break;
- case CLOSE:
- close();
+ case CLOSE :
+ close();
break;
- default:
+ default :
throw new RemoteException("Bad method
code !");
}
-
+
//Everthing was OK
-
+
try {
out.writeByte(0);
out.flush();
} catch (IOException e) {
- failure("Result write",e);
- return;
+ failure("Result write", e);
+ return;
}
-
+
} catch (Exception e) {
try {
- if( e instanceof NoReceiverException ) {
+ if (e instanceof NoReceiverException) {
out.writeByte(2);
} else {
out.writeByte(1);
@@ -157,33 +150,30 @@
out.reset();
out.flush();
} catch (IOException e2) {
- failure("Result write",e2);
- return;
+ failure("Result write", e2);
+ return;
}
-
+
}
}
}
-
- void failure(String st,Exception e)
- {
- Log.error("Closing socket: "+st);
+
+ void failure(String st, Exception e) {
+ Log.error("Closing socket: " + st);
Log.error(e);
}
-
+
// Public --------------------------------------------------------
- public void setConnection(SpyConnection connection)
- {
- this.connection=connection;
+ public void setConnection(SpyConnection connection) {
+ this.connection = connection;
}
-
- public ConnectionReceiver createClient() throws Exception
- {
- return new
ConnectionReceiverOILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
+
+ public ConnectionReceiver createClient() throws Exception {
+ return new ConnectionReceiverOILClient(InetAddress.getLocalHost(),
serverSocket.getLocalPort());
}
-
+
// ---
//<DEBUG>
@@ -192,7 +182,7 @@
{
connection.rec++;
}
-
+
public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
{
connection.rec++;
@@ -210,7 +200,7 @@
if (connection instanceof SpyTopicConnection) {
//Get the set of subscribers for this Topic
- SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+ SpyConsumer consumers[] = connection.getConsumers(dest);
for (int i = 0; i < consumers.length; i++) {
@@ -218,7 +208,7 @@
consumers[i].addMessage(mes);
//There is work to do...
- consumers[i].session.mutex.notifyLock();
+ consumers[i].processMessages();
}
} else {
@@ -227,18 +217,18 @@
if (connection.modeStop)
throw new JMSException("This connection is stopped !");
- SpyMessageConsumer consumer =
connection.pickListeningConsumer(dest);
+ SpyConsumer consumer = connection.pickListeningConsumer(dest);
if (consumer == null)
throw new NoReceiverException("There are no listening
sessions for this destination !");
//Try with this sessionQueue
Log.log("Dispatching to SessionQueue: " + mes);
- ((org.spydermq.SpyQueueReceiver)consumer).dispatchMessage( mes
);
+ consumer.addMessage(mes);
+ consumer.processMessages();
}
}
-
public void receiveMultiple(SpyDestination dest, int nb, ObjectInputStream in)
throws Exception {
if (closed)
@@ -249,7 +239,7 @@
if (connection instanceof SpyTopicConnection) {
//Get the set of subscribers for this Topic
- SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+ SpyConsumer consumers[] = connection.getConsumers(dest);
for (int val = 0; val < nb; val++) {
SpyMessage mes = (SpyMessage) in.readObject();
@@ -260,24 +250,22 @@
consumers[i].addMessage(mes);
//There is work to do...
- consumers[i].session.mutex.notifyLock();
+ consumers[i].processMessages();
}
}
} else {
throw new Exception("Multiple dispatch for a Queue");
}
- }
-
-
- public void close() throws Exception
- {
- closed=true;
}
+ public void close() throws Exception {
+ closed = true;
+ }
+
//One TemporaryDestination has been deleted
- public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
+ public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
connection.deleteTemporaryDestination(dest);
}
1.14 +10 -8
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
Index: ConnectionReceiverRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- ConnectionReceiverRMIImpl.java 2000/12/12 05:58:50 1.13
+++ ConnectionReceiverRMIImpl.java 2000/12/21 22:33:58 1.14
@@ -17,9 +17,9 @@
import org.spydermq.SpyQueueSession;
import org.spydermq.Log;
import org.spydermq.NoReceiverException;
-import org.spydermq.SpyMessageConsumer;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
+import org.spydermq.SpyConsumer;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
@@ -27,13 +27,14 @@
import java.util.HashSet;
import java.util.Iterator;
+
/**
* The RMI implementation of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.13 $
+ * @version $Revision: 1.14 $
*/
public class ConnectionReceiverRMIImpl
extends UnicastRemoteObject
@@ -71,7 +72,7 @@
if (connection instanceof SpyTopicConnection) {
//Get the set of subscribers for this Topic
- SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+ SpyConsumer consumers[] = connection.getConsumers(dest);
for (int i = 0; i < consumers.length; i++) {
@@ -79,7 +80,7 @@
consumers[i].addMessage(mes);
//There is work to do...
- consumers[i].session.mutex.notifyLock();
+ consumers[i].processMessages();
}
} else {
@@ -88,13 +89,14 @@
if (connection.modeStop)
throw new JMSException("This connection is stopped !");
- SpyMessageConsumer consumer =
connection.pickListeningConsumer(dest);
+ SpyConsumer consumer = connection.pickListeningConsumer(dest);
if (consumer == null)
throw new NoReceiverException("There are no listening
sessions for this destination !");
//Try with this sessionQueue
Log.log("Dispatching to SessionQueue: " + mes);
- ((org.spydermq.SpyQueueReceiver)
consumer).dispatchMessage(mes);
+ consumer.addMessage(mes);
+ consumer.processMessages();
}
}
@@ -109,7 +111,7 @@
if (connection instanceof SpyTopicConnection) {
//Get the set of subscribers for this Topic
- SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+ SpyConsumer consumers[] = connection.getConsumers(dest);
for(int i=0;i<mes.length;i++) {
@@ -119,7 +121,7 @@
consumers[j].addMessage(mes[i]);
//There is work to do...
- consumers[j].session.mutex.notifyLock();
+ consumers[j].processMessages();
}
}
} else {
1.5 +84 -96
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUIL.java
Index: ConnectionReceiverUIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUIL.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- ConnectionReceiverUIL.java 2000/11/29 17:06:36 1.4
+++ ConnectionReceiverUIL.java 2000/12/21 22:33:58 1.5
@@ -8,11 +8,9 @@
import javax.jms.Destination;
import javax.jms.JMSException;
-import org.spydermq.SpyConnection;
+import org.spydermq.SpyConnection;
import org.spydermq.SpyMessage;
-
-
import org.spydermq.SpyDestination;
import org.spydermq.SpyTopicConnection;
import org.spydermq.SpyQueueSession;
@@ -20,10 +18,12 @@
import org.spydermq.NoReceiverException;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
+import org.spydermq.SpyConsumer;
+
import java.util.Hashtable;
import java.util.HashSet;
import java.util.Iterator;
-import java.rmi.RemoteException;
+import java.rmi.RemoteException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.InetAddress;
@@ -35,18 +35,16 @@
import java.io.ObjectOutputStream;
import java.io.IOException;
-import org.spydermq.SpyMessageConsumer;
+
/**
* The OIL implementation of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
-public class ConnectionReceiverUIL
- implements Runnable, ConnectionReceiverSetup
-{
+public class ConnectionReceiverUIL implements Runnable, ConnectionReceiverSetup {
// Attributes ----------------------------------------------------
//A link on my connection
@@ -55,104 +53,98 @@
private boolean closed;
// Constructor ---------------------------------------------------
-
- public ConnectionReceiverUIL()
- {
- closed=false;
+
+ public ConnectionReceiverUIL() {
+ closed = false;
}
// Internals -----------------------------------------------------
-
+
static final int RECEIVE = 1;
static final int RECEIVE_MULTIPLE = 2;
static final int DELETE_TEMPORARY_DESTINATION = 3;
- static final int CLOSE = 4;
-
-
+ static final int CLOSE = 4;
- void exportObject()
- {
- Thread thread = new Thread(this, "ConnectionReceiverUIL");
- thread.setDaemon(true);
- thread.start();
+ void exportObject() {
+ Thread thread = new Thread(this, "ConnectionReceiverUIL");
+ thread.setDaemon(true);
+ thread.start();
}
-
- public void run()
- {
+
+ public void run() {
Socket socket = null;
int code = 0;
+
+ ObjectOutputStream out = null;
+ ObjectInputStream in = null;
- ObjectOutputStream out=null;
- ObjectInputStream in=null;
-
try {
- DistributedJMSServerUILClient uilClient =
(DistributedJMSServerUILClient)connection.getProvider();
+ DistributedJMSServerUILClient uilClient =
(DistributedJMSServerUILClient) connection.getProvider();
out = new ObjectOutputStream(new
BufferedOutputStream(uilClient.mSocket.getOutputStream(2)));
out.flush();
in = new ObjectInputStream(new
BufferedInputStream(uilClient.mSocket.getInputStream(2)));
-
+
} catch (IOException e) {
- failure("Initialisation",e);
+ failure("Initialisation", e);
return;
}
while (!closed) {
try {
- code=in.readByte();
+ code = in.readByte();
} catch (IOException e) {
- if( closed )
+ if (closed)
break;
-
- failure("Command read",e);
+
+ failure("Command read", e);
e.printStackTrace();
break;
}
-
+
try {
-
- switch (code)
- {
- case RECEIVE:
-
receive((SpyDestination)in.readObject(),(SpyMessage)in.readObject());
+ switch (code) {
+ case RECEIVE :
+ receive((SpyDestination)
in.readObject(), (SpyMessage) in.readObject());
break;
- case RECEIVE_MULTIPLE:
- SpyDestination
dest=(SpyDestination)in.readObject();
- int nb=in.readInt();
- receiveMultiple(dest,nb,in);
+ case RECEIVE_MULTIPLE :
+ SpyDestination dest = (SpyDestination)
in.readObject();
+ int nb = in.readInt();
+ receiveMultiple(dest, nb, in);
break;
- case DELETE_TEMPORARY_DESTINATION:
-
deleteTemporaryDestination((SpyDestination)in.readObject());
+ case DELETE_TEMPORARY_DESTINATION :
+
deleteTemporaryDestination((SpyDestination) in.readObject());
break;
- case CLOSE:
- close();
+ case CLOSE :
+ close();
break;
- default:
+ default :
throw new RemoteException("Bad method
code !");
}
-
+
//Everthing was OK
-
+
try {
out.writeByte(0);
out.flush();
} catch (IOException e) {
- if( closed )
- break;;
-
- failure("Result write",e);
+ if (closed)
+ break;
+ ;
+
+ failure("Result write", e);
break;
}
-
+
} catch (Exception e) {
-
- if( closed )
+
+ if (closed)
break;
try {
- if( e instanceof NoReceiverException ) {
+ if (e instanceof NoReceiverException) {
out.writeByte(2);
out.writeObject(e.getMessage());
} else {
@@ -162,46 +154,43 @@
out.reset();
out.flush();
} catch (IOException e2) {
- failure("Result write",e2);
- break;
+ failure("Result write", e2);
+ break;
}
-
+
}
}
-
+
try {
Log.log("Closing receiver connections.");
out.close();
in.close();
- } catch ( IOException e ) {
+ } catch (IOException e) {
Log.log("Error whle closing receiver connections ");
Log.log(e);
- return;
+ return;
}
}
-
- void failure(String st,Exception e)
- {
- Log.error("Closing socket: "+st);
+
+ void failure(String st, Exception e) {
+ Log.error("Closing socket: " + st);
Log.error(e);
}
-
+
// Public --------------------------------------------------------
- public void setConnection(SpyConnection connection)
- {
- boolean export = this.connection==null;
- this.connection=connection;
- if( export )
+ public void setConnection(SpyConnection connection) {
+ boolean export = this.connection == null;
+ this.connection = connection;
+ if (export)
exportObject();
}
-
- public ConnectionReceiver createClient() throws Exception
- {
+
+ public ConnectionReceiver createClient() throws Exception {
return new ConnectionReceiverUILClient();
}
-
+
//A message has arrived for this Connection, We have to dispatch it to the
sessions
public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
if (closed)
@@ -212,7 +201,7 @@
if (connection instanceof SpyTopicConnection) {
//Get the set of subscribers for this Topic
- SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+ SpyConsumer consumers[] = connection.getConsumers(dest);
for (int i = 0; i < consumers.length; i++) {
@@ -220,7 +209,7 @@
consumers[i].addMessage(mes);
//There is work to do...
- consumers[i].session.mutex.notifyLock();
+ consumers[i].processMessages();
}
} else {
@@ -229,16 +218,17 @@
if (connection.modeStop)
throw new JMSException("This connection is stopped !");
- SpyMessageConsumer consumer =
connection.pickListeningConsumer(dest);
+ SpyConsumer consumer = connection.pickListeningConsumer(dest);
if (consumer == null)
throw new NoReceiverException("There are no listening
sessions for this destination !");
//Try with this sessionQueue
Log.log("Dispatching to SessionQueue: " + mes);
- ((org.spydermq.SpyQueueReceiver)
consumer).dispatchMessage(mes);
+ consumer.addMessage(mes);
+ consumer.processMessages();
}
- }
+ }
public void receiveMultiple(SpyDestination dest, int nb, ObjectInputStream in)
throws Exception {
if (closed)
@@ -249,7 +239,7 @@
if (connection instanceof SpyTopicConnection) {
//Get the set of subscribers for this Topic
- SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+ SpyConsumer consumers[] = connection.getConsumers(dest);
for (int val = 0; val < nb; val++) {
SpyMessage mes = (SpyMessage) in.readObject();
@@ -260,24 +250,22 @@
consumers[i].addMessage(mes);
//There is work to do...
- consumers[i].session.mutex.notifyLock();
+ consumers[i].processMessages();
}
}
} else {
throw new Exception("Multiple dispatch for a Queue");
}
- }
-
-
- public void close() throws Exception
- {
- closed=true;
}
+ public void close() throws Exception {
+ closed = true;
+ }
+
//One TemporaryDestination has been deleted
- public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
+ public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
connection.deleteTemporaryDestination(dest);
}