User: hiram
Date: 00/12/23 07:48:23
Modified: src/java/org/spydermq/distributed/server
ConnectionReceiverOIL.java
ConnectionReceiverOILClient.java
ConnectionReceiverRMI.java
ConnectionReceiverRMIImpl.java
ConnectionReceiverUIL.java
ConnectionReceiverUILClient.java
DistributedJMSServerOIL.java
DistributedJMSServerOILClient.java
DistributedJMSServerRMI.java
DistributedJMSServerRMIImpl.java
DistributedJMSServerUIL.java
DistributedJMSServerUILClient.java
Log:
These changes were done to add the following features:
The selector is now evaluated at the server side.
The infrastructure has been laid for durable topic subscriptions.
The QueueBrowser has been implemented.
Queues now can have a Selector.
Revision Changes Path
1.13 +11 -113
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
Index: ConnectionReceiverOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- ConnectionReceiverOIL.java 2000/12/21 22:33:58 1.12
+++ ConnectionReceiverOIL.java 2000/12/23 15:48:20 1.13
@@ -42,7 +42,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class ConnectionReceiverOIL implements Runnable, ConnectionReceiverSetup {
// Attributes ----------------------------------------------------
@@ -51,23 +51,19 @@
private SpyConnection connection;
//Is my connection closed ?
private boolean closed;
+ private ServerSocket serverSocket;
- // Constructor ---------------------------------------------------
+ // Internals -----------------------------------------------------
+ static final int m_close = 2;
+ static final int m_deleteTemporaryDestination = 1;
+ static final int m_receive = 3;
+ // Constructor ---------------------------------------------------
public ConnectionReceiverOIL() {
closed = false;
exportObject();
}
- // Internals -----------------------------------------------------
-
- static final int RECEIVE = 1;
- static final int RECEIVE_MULTIPLE = 2;
- static final int DELETE_TEMPORARY_DESTINATION = 3;
- static final int CLOSE = 4;
-
- private ServerSocket serverSocket;
-
void exportObject() {
try {
serverSocket = new ServerSocket(0);
@@ -110,18 +106,13 @@
try {
switch (code) {
- case RECEIVE :
- receive((SpyDestination)
in.readObject(), (SpyMessage) in.readObject());
- break;
- case RECEIVE_MULTIPLE :
- SpyDestination dest = (SpyDestination)
in.readObject();
- int nb = in.readInt();
- receiveMultiple(dest, nb, in);
+ case m_receive:
+
connection.deliver((org.spydermq.ReceiveRequest[])in.readObject());
break;
- case DELETE_TEMPORARY_DESTINATION :
-
deleteTemporaryDestination((SpyDestination) in.readObject());
+ case m_deleteTemporaryDestination:
+
connection.deleteTemporaryDestination((SpyDestination) in.readObject());
break;
- case CLOSE :
+ case m_close :
close();
break;
default :
@@ -164,8 +155,6 @@
Log.error(e);
}
- // Public --------------------------------------------------------
-
public void setConnection(SpyConnection connection) {
this.connection = connection;
}
@@ -174,100 +163,9 @@
return new ConnectionReceiverOILClient(InetAddress.getLocalHost(),
serverSocket.getLocalPort());
}
- // ---
-
- //<DEBUG>
-
- /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
- {
- connection.rec++;
- }
-
- public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
- {
- connection.rec++;
- }*/
-
- //</DEBUG>
-
- //A message has arrived for this Connection, We have to dispatch it to the
sessions
- public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
- if (closed)
- throw new IllegalStateException("The connection is closed");
-
- Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() +
",Mes=" + mes.toString() + ")");
-
- if (connection instanceof SpyTopicConnection) {
-
- //Get the set of subscribers for this Topic
- SpyConsumer consumers[] = connection.getConsumers(dest);
-
- for (int i = 0; i < consumers.length; i++) {
-
- //add the new message to the consumer's queue
- consumers[i].addMessage(mes);
-
- //There is work to do...
- consumers[i].processMessages();
- }
-
- } else {
-
- //Find one session waiting for this Queue
- if (connection.modeStop)
- throw new JMSException("This connection is stopped !");
-
- SpyConsumer consumer = connection.pickListeningConsumer(dest);
- if (consumer == null)
- throw new NoReceiverException("There are no listening
sessions for this destination !");
-
- //Try with this sessionQueue
- Log.log("Dispatching to SessionQueue: " + mes);
- consumer.addMessage(mes);
- consumer.processMessages();
-
- }
-
- }
-
- public void receiveMultiple(SpyDestination dest, int nb, ObjectInputStream in)
throws Exception {
- if (closed)
- throw new IllegalStateException("The connection is closed");
-
- Log.log("ConnectionReceiver: ReceiveMultiple()");
-
- if (connection instanceof SpyTopicConnection) {
-
- //Get the set of subscribers for this Topic
- SpyConsumer consumers[] = connection.getConsumers(dest);
-
- for (int val = 0; val < nb; val++) {
- SpyMessage mes = (SpyMessage) in.readObject();
-
- for (int i = 0; i < consumers.length; i++) {
-
- //add the new message to the consumer's queue
- consumers[i].addMessage(mes);
-
- //There is work to do...
- consumers[i].processMessages();
- }
- }
- } else {
- throw new Exception("Multiple dispatch for a Queue");
- }
- }
-
public void close() throws Exception {
closed = true;
}
- //One TemporaryDestination has been deleted
- public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException {
- if (closed)
- throw new IllegalStateException("The connection is closed");
-
- connection.deleteTemporaryDestination(dest);
- }
}
1.13 +17 -28
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
Index: ConnectionReceiverOILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- ConnectionReceiverOILClient.java 2000/12/12 05:58:50 1.12
+++ ConnectionReceiverOILClient.java 2000/12/23 15:48:20 1.13
@@ -25,28 +25,29 @@
import java.net.InetAddress;
import java.net.SocketException;
+import org.spydermq.ReceiveRequest;
+
/**
* The UIL implementation of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class ConnectionReceiverOILClient
implements ConnectionReceiver, Serializable
{
- static final int RECEIVE = 1;
- static final int RECEIVE_MULTIPLE = 2;
- static final int DELETE_TEMPORARY_DESTINATION = 3;
- static final int CLOSE = 4;
-
+
private transient Socket socket;
private transient ObjectOutputStream out;
private transient ObjectInputStream in;
-
private int port;
private InetAddress addr;
+
+ static final int m_close = 2;
+ static final int m_deleteTemporaryDestination = 1;
+ static final int m_receive = 3;
public ConnectionReceiverOILClient(InetAddress addr,int port)
{
@@ -94,30 +95,11 @@
throw throwException;
}
- public void receive(SpyDestination dest,SpyMessage mes) throws Exception
- {
- checkSocket();
- out.writeByte(RECEIVE);
- out.writeObject(dest);
- out.writeObject(mes);
- waitAnswer();
- }
-
- public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
Exception
- {
- checkSocket();
- out.writeByte(RECEIVE_MULTIPLE);
- out.writeObject(dest);
- out.writeInt(mes.length);
- for(int i=0;i<mes.length;i++)
- out.writeObject(mes[i]);
- waitAnswer();
- }
public void deleteTemporaryDestination(SpyDestination dest) throws Exception
{
checkSocket();
- out.writeByte(DELETE_TEMPORARY_DESTINATION);
+ out.writeByte(m_deleteTemporaryDestination);
out.writeObject(dest);
waitAnswer();
}
@@ -125,7 +107,7 @@
public void close() throws Exception
{
checkSocket();
- out.writeByte(CLOSE);
+ out.writeByte(m_close);
waitAnswer();
}
@@ -134,4 +116,11 @@
if (socket==null) createConnection();
}
+ public void receive(ReceiveRequest messages[]) throws Exception
+ {
+ checkSocket();
+ out.writeByte(m_receive);
+ out.writeObject(messages);
+ waitAnswer();
+ }
}
1.5 +10 -7
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMI.java
Index: ConnectionReceiverRMI.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMI.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- ConnectionReceiverRMI.java 2000/12/12 05:58:52 1.4
+++ ConnectionReceiverRMI.java 2000/12/23 15:48:20 1.5
@@ -9,29 +9,32 @@
import javax.jms.Destination;
import javax.jms.JMSException;
-import org.spydermq.SpyMessage;
+
import org.spydermq.SpyDestination;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import java.rmi.Remote;
import java.rmi.RemoteException;
+import org.spydermq.ReceiveRequest;
+
/**
* The RMI interface of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public interface ConnectionReceiverRMI
extends ConnectionReceiver, Remote
{
- // Public --------------------------------------------------------
- //A message has arrived for the Connection
- public void receive(SpyDestination b,SpyMessage c) throws RemoteException,
JMSException;
- //Messages have arrived for the Connection
- public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
RemoteException, JMSException;
+
//One TemporaryDestination has been deleted
public void deleteTemporaryDestination(SpyDestination dest) throws
RemoteException, JMSException;
+
+ // Public --------------------------------------------------------
+
+ //A message has arrived for the Connection
+ public void receive(ReceiveRequest messages[]) throws RemoteException,
Exception;
}
1.15 +11 -77
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
Index: ConnectionReceiverRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- ConnectionReceiverRMIImpl.java 2000/12/21 22:33:58 1.14
+++ ConnectionReceiverRMIImpl.java 2000/12/23 15:48:20 1.15
@@ -10,31 +10,22 @@
import javax.jms.JMSException;
import org.spydermq.SpyConnection;
-import org.spydermq.SpyMessage;
-import org.spydermq.SpySession;
import org.spydermq.SpyDestination;
-import org.spydermq.SpyTopicConnection;
-import org.spydermq.SpyQueueSession;
import org.spydermq.Log;
-import org.spydermq.NoReceiverException;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
-import org.spydermq.SpyConsumer;
+import org.spydermq.ReceiveRequest;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
-import java.util.Hashtable;
-import java.util.HashSet;
-import java.util.Iterator;
-
/**
* The RMI implementation of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.14 $
+ * @version $Revision: 1.15 $
*/
public class ConnectionReceiverRMIImpl
extends UnicastRemoteObject
@@ -62,73 +53,6 @@
this.connection=connection;
}
- //A message has arrived for this Connection, We have to dispatch it to the
sessions
- public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
- if (closed)
- throw new IllegalStateException("The connection is closed");
-
- Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() +
",Mes=" + mes.toString() + ")");
-
- if (connection instanceof SpyTopicConnection) {
-
- //Get the set of subscribers for this Topic
- SpyConsumer consumers[] = connection.getConsumers(dest);
-
- for (int i = 0; i < consumers.length; i++) {
-
- //add the new message to the consumer's queue
- consumers[i].addMessage(mes);
-
- //There is work to do...
- consumers[i].processMessages();
- }
-
- } else {
-
- //Find one session waiting for this Queue
- if (connection.modeStop)
- throw new JMSException("This connection is stopped !");
-
- SpyConsumer consumer = connection.pickListeningConsumer(dest);
- if (consumer == null)
- throw new NoReceiverException("There are no listening
sessions for this destination !");
-
- //Try with this sessionQueue
- Log.log("Dispatching to SessionQueue: " + mes);
- consumer.addMessage(mes);
- consumer.processMessages();
-
- }
- }
-
- public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
- {
- if (closed)
- throw new IllegalStateException("The connection is closed");
-
- Log.log("ConnectionReceiver: ReceiveMultiple()");
-
- if (connection instanceof SpyTopicConnection) {
-
- //Get the set of subscribers for this Topic
- SpyConsumer consumers[] = connection.getConsumers(dest);
-
- for(int i=0;i<mes.length;i++) {
-
- for (int j = 0; j < consumers.length; j++) {
-
- //add the new message to the consumer's queue
- consumers[j].addMessage(mes[i]);
-
- //There is work to do...
- consumers[j].processMessages();
- }
- }
- } else {
- throw new JMSException("Multiple dispatch for a Queue");
- }
- }
-
public void close() throws Exception
{
closed=true;
@@ -147,4 +71,14 @@
return this;
}
+ public void receive(ReceiveRequest messages[]) throws Exception {
+
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+
+ Log.log("ConnectionReceiver: Receive(ReceiveRequest[" +
messages.length +"])");
+
+ connection.deliver(messages);
+
+ }
}
1.6 +8 -92
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUIL.java
Index: ConnectionReceiverUIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUIL.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- ConnectionReceiverUIL.java 2000/12/21 22:33:58 1.5
+++ ConnectionReceiverUIL.java 2000/12/23 15:48:21 1.6
@@ -42,10 +42,13 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class ConnectionReceiverUIL implements Runnable, ConnectionReceiverSetup {
// Attributes ----------------------------------------------------
+ static final int m_close = 2;
+ static final int m_deleteTemporaryDestination = 1;
+ static final int m_receive = 3;
//A link on my connection
private SpyConnection connection;
@@ -58,13 +61,6 @@
closed = false;
}
- // Internals -----------------------------------------------------
-
- static final int RECEIVE = 1;
- static final int RECEIVE_MULTIPLE = 2;
- static final int DELETE_TEMPORARY_DESTINATION = 3;
- static final int CLOSE = 4;
-
void exportObject() {
Thread thread = new Thread(this, "ConnectionReceiverUIL");
thread.setDaemon(true);
@@ -106,18 +102,13 @@
try {
switch (code) {
- case RECEIVE :
- receive((SpyDestination)
in.readObject(), (SpyMessage) in.readObject());
+ case m_receive:
+
connection.deliver((org.spydermq.ReceiveRequest[])in.readObject());
break;
- case RECEIVE_MULTIPLE :
- SpyDestination dest = (SpyDestination)
in.readObject();
- int nb = in.readInt();
- receiveMultiple(dest, nb, in);
+ case m_deleteTemporaryDestination:
+
connection.deleteTemporaryDestination((SpyDestination) in.readObject());
break;
- case DELETE_TEMPORARY_DESTINATION :
-
deleteTemporaryDestination((SpyDestination) in.readObject());
- break;
- case CLOSE :
+ case m_close :
close();
break;
default :
@@ -191,83 +182,8 @@
return new ConnectionReceiverUILClient();
}
- //A message has arrived for this Connection, We have to dispatch it to the
sessions
- public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
- if (closed)
- throw new IllegalStateException("The connection is closed");
-
- Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() +
",Mes=" + mes.toString() + ")");
-
- if (connection instanceof SpyTopicConnection) {
-
- //Get the set of subscribers for this Topic
- SpyConsumer consumers[] = connection.getConsumers(dest);
-
- for (int i = 0; i < consumers.length; i++) {
-
- //add the new message to the consumer's queue
- consumers[i].addMessage(mes);
-
- //There is work to do...
- consumers[i].processMessages();
- }
-
- } else {
-
- //Find one session waiting for this Queue
- if (connection.modeStop)
- throw new JMSException("This connection is stopped !");
-
- SpyConsumer consumer = connection.pickListeningConsumer(dest);
- if (consumer == null)
- throw new NoReceiverException("There are no listening
sessions for this destination !");
-
- //Try with this sessionQueue
- Log.log("Dispatching to SessionQueue: " + mes);
- consumer.addMessage(mes);
- consumer.processMessages();
-
- }
- }
-
- public void receiveMultiple(SpyDestination dest, int nb, ObjectInputStream in)
throws Exception {
- if (closed)
- throw new IllegalStateException("The connection is closed");
-
- Log.log("ConnectionReceiver: ReceiveMultiple()");
-
- if (connection instanceof SpyTopicConnection) {
-
- //Get the set of subscribers for this Topic
- SpyConsumer consumers[] = connection.getConsumers(dest);
-
- for (int val = 0; val < nb; val++) {
- SpyMessage mes = (SpyMessage) in.readObject();
-
- for (int i = 0; i < consumers.length; i++) {
-
- //add the new message to the consumer's queue
- consumers[i].addMessage(mes);
-
- //There is work to do...
- consumers[i].processMessages();
- }
- }
- } else {
- throw new Exception("Multiple dispatch for a Queue");
- }
- }
-
public void close() throws Exception {
closed = true;
- }
-
- //One TemporaryDestination has been deleted
- public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException {
- if (closed)
- throw new IllegalStateException("The connection is closed");
-
- connection.deleteTemporaryDestination(dest);
}
}
1.5 +24 -31
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java
Index: ConnectionReceiverUILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- ConnectionReceiverUILClient.java 2000/12/12 05:58:51 1.4
+++ ConnectionReceiverUILClient.java 2000/12/23 15:48:21 1.5
@@ -31,19 +31,20 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
-import org.spydermq.multiplexor.SocketMultiplexor;public class
ConnectionReceiverUILClient implements ConnectionReceiver, Serializable
+import org.spydermq.multiplexor.SocketMultiplexor;
+import org.spydermq.ReceiveRequest;
+
+public class ConnectionReceiverUILClient implements ConnectionReceiver, Serializable
{
- static final int RECEIVE = 1;
- static final int RECEIVE_MULTIPLE = 2;
- static final int DELETE_TEMPORARY_DESTINATION = 3;
- static final int CLOSE = 4;
-
private transient ObjectOutputStream out;
private transient ObjectInputStream in;
transient SocketMultiplexor mSocket;
+ static final int m_close = 2;
+ static final int m_deleteTemporaryDestination = 1;
+ static final int m_receive = 3;
void createConnection() throws RemoteException
{
@@ -81,40 +82,20 @@
if( throwException != null )
throw throwException;
- }
-
- public void receive(SpyDestination dest,SpyMessage mes) throws Exception
- {
- if (out==null) createConnection();
- out.writeByte(RECEIVE);
- out.writeObject(dest);
- out.writeObject(mes);
- waitAnswer();
- }
-
- public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
Exception
- {
- if (out==null) createConnection();
- out.writeByte(RECEIVE_MULTIPLE);
- out.writeObject(dest);
- out.writeInt(mes.length);
- for(int i=0;i<mes.length;i++)
- out.writeObject(mes[i]);
- waitAnswer();
}
public void deleteTemporaryDestination(SpyDestination dest) throws Exception
{
- if (out==null) createConnection();
- out.writeByte(DELETE_TEMPORARY_DESTINATION);
+ checkSocket();
+ out.writeByte(m_deleteTemporaryDestination);
out.writeObject(dest);
waitAnswer();
}
public void close() throws Exception
{
- if (out==null) createConnection();
- out.writeByte(CLOSE);
+ checkSocket();
+ out.writeByte(m_close);
waitAnswer();
}
@@ -122,4 +103,16 @@
{
}
+ protected void checkSocket() throws Exception {
+ if (out == null)
+ createConnection();
+ }
+
+ public void receive(ReceiveRequest messages[]) throws Exception
+ {
+ checkSocket();
+ out.writeByte(m_receive);
+ out.writeObject(messages);
+ waitAnswer();
+ }
}
1.9 +47 -38
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java
Index: DistributedJMSServerOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- DistributedJMSServerOIL.java 2000/12/19 06:43:37 1.8
+++ DistributedJMSServerOIL.java 2000/12/23 15:48:21 1.9
@@ -17,10 +17,11 @@
import org.spydermq.SpyDestination;
import org.spydermq.SpyDistributedConnection;
import org.spydermq.Log;
-import org.spydermq.SpyAcknowledgementItem;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
-
+import org.spydermq.TransactionRequest;
+import org.spydermq.AcknowledgementRequest;
+import org.spydermq.Subscription;
import java.rmi.RemoteException;
import java.net.ServerSocket;
@@ -32,15 +33,13 @@
import java.io.BufferedInputStream;
import java.io.IOException;
-import org.spydermq.TransactionRequest;
-
/**
* The OIL implementation of the DistributedJMSServer object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public class DistributedJMSServerOIL extends DistributedJMSServerUIL
implements DistributedJMSServerOILMBean
@@ -89,55 +88,61 @@
switch (code)
{
- case GetID:
- result=server.getID();
+ case m_setSpyDistributedConnection:
+ spyDistributedConnection =
(SpyDistributedConnection)in.readObject();
break;
- case NewMessage:
+ case m_acknowledge:
+
server.acknowledge(spyDistributedConnection, (AcknowledgementRequest)in.readObject());
+ break;
+ case m_addMessage:
server.addMessage(spyDistributedConnection, (SpyMessage)in.readObject());
break;
- case Subscribe:
-
server.subscribe(spyDistributedConnection, (Destination)in.readObject());
+ case m_browse:
+
result=server.browse(spyDistributedConnection, (Destination)in.readObject(),
(String)in.readObject());
break;
- case Unsubscribe:
-
server.unsubscribe(spyDistributedConnection,(Destination)in.readObject());
+ case m_checkID:
+
server.checkID((String)in.readObject());
break;
- case CreateTopic:
-
result=(Topic)server.createTopic((String)in.readObject());
+ case m_connectionClosing:
+
server.connectionClosing(spyDistributedConnection);
+ closed = true;
break;
- case CreateQueue:
-
result=(Queue)server.createQueue((String)in.readObject());
+ case this.m_createQueue:
+
result=(Queue)server.createQueue(spyDistributedConnection, (String)in.readObject());
break;
- case GetTemporaryTopic:
-
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
+ case m_createTopic:
+
result=(Topic)server.createTopic(spyDistributedConnection, (String)in.readObject());
break;
- case GetTemporaryQueue:
-
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
+ case m_deleteTemporaryDestination:
+
server.deleteTemporaryDestination(spyDistributedConnection,
(SpyDestination)in.readObject());
break;
- case ConnectionClosing:
-
server.connectionClosing(spyDistributedConnection,null);
- closed = true;
+ case this.m_getID:
+ result=server.getID();
break;
- case DeleteTemporaryDestination:
-
server.deleteTemporaryDestination((SpyDestination)in.readObject());
+ case m_getTemporaryQueue:
+
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
break;
- case CheckID:
-
server.checkID((String)in.readObject());
+ case m_getTemporaryTopic:
+
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
break;
- case QueueReceive:
-
result=server.queueReceive(spyDistributedConnection,(Queue)in.readObject(),
in.readLong());
+ case m_listenerChange:
+
server.listenerChange(spyDistributedConnection,in.readInt(),in.readBoolean());
break;
- case ConnectionListening:
-
server.connectionListening(spyDistributedConnection,in.readBoolean(),(Destination)in.readObject());
+ case m_receive:
+
result=server.receive(spyDistributedConnection,in.readInt(), in.readLong());
break;
- case Acknowledge:
-
server.acknowledge(spyDistributedConnection, (SpyAcknowledgementItem)in.readObject());
+ case m_setEnabled :
+
server.setEnabled(spyDistributedConnection, in.readBoolean());
break;
- case SetSpyDistributedConnection:
- spyDistributedConnection =
(SpyDistributedConnection)in.readObject();
+ case m_subscribe:
+
server.subscribe(spyDistributedConnection, (Subscription)in.readObject());
break;
- case Transact:
+ case m_transact:
server.transact(spyDistributedConnection, (TransactionRequest)in.readObject());
break;
+ case m_unsubscribe:
+
server.unsubscribe(spyDistributedConnection,in.readInt());
+ break;
default:
throw new RemoteException("Bad method
code !");
}
@@ -175,10 +180,14 @@
}
}
}
-
+
try {
- if( !closed )
-
server.connectionClosing(spyDistributedConnection,null);
+ if( !closed ) {
+ try {
+
server.connectionClosing(spyDistributedConnection);
+ } catch (JMSException e) {
+ }
+ }
socket.close();
} catch (IOException e ) {
Log.log("Could not gracefully close the connection with the
client");
1.6 +3 -2
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java
Index: DistributedJMSServerOILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- DistributedJMSServerOILClient.java 2000/12/12 05:58:50 1.5
+++ DistributedJMSServerOILClient.java 2000/12/23 15:48:21 1.6
@@ -18,8 +18,8 @@
import org.spydermq.Log;
import org.spydermq.SpyDistributedConnection;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
-import org.spydermq.SpyAcknowledgementItem;
import org.spydermq.server.JMSServer;
+import org.spydermq.AcknowledgementRequest;
import java.rmi.RemoteException;
import java.io.ObjectOutputStream;
@@ -31,13 +31,14 @@
import java.net.Socket;
import java.net.InetAddress;
+
/**
*The OIL implementation of the DistributedJMSServer object
*
*@author Norbert Lataille ([EMAIL PROTECTED])
*@author Hiram Chirino ([EMAIL PROTECTED])
*
- *@version $Revision: 1.5 $
+ *@version $Revision: 1.6 $
*/
public class DistributedJMSServerOILClient extends DistributedJMSServerUILClient
implements DistributedJMSServer, Serializable {
1.5 +9 -6
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java
Index: DistributedJMSServerRMI.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- DistributedJMSServerRMI.java 2000/12/19 06:43:37 1.4
+++ DistributedJMSServerRMI.java 2000/12/23 15:48:21 1.5
@@ -20,7 +20,7 @@
import org.spydermq.SpyDestination;
import org.spydermq.SpyDistributedConnection;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
-import org.spydermq.SpyAcknowledgementItem;
+import org.spydermq.AcknowledgementRequest;
/**
* The RMI interface of the DistributedJMSServer object
@@ -28,9 +28,10 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public interface DistributedJMSServerRMI extends DistributedJMSServer, Remote {
+
// Public --------------------------------------------------------
public String getID() throws RemoteException, Exception;
public void checkID(String ID) throws RemoteException, Exception;
@@ -38,14 +39,16 @@
public void connectionClosing(SpyDistributedConnection dc) throws
RemoteException, Exception;
public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
RemoteException, Exception;
public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
RemoteException, Exception;
- public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item) throws RemoteException, Exception;
public void addMessage(SpyDistributedConnection dc, SpyMessage message) throws
RemoteException, Exception;
- public void connectionListening(SpyDistributedConnection dc, boolean mode,
Destination dest) throws RemoteException, Exception;
public Queue createQueue(SpyDistributedConnection dc, String dest) throws
RemoteException, Exception;
public Topic createTopic(SpyDistributedConnection dc, String dest) throws
RemoteException, Exception;
public void deleteTemporaryDestination(SpyDistributedConnection dc,
SpyDestination dest) throws RemoteException, Exception;
- public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long
wait) throws RemoteException, Exception;
- public void subscribe(SpyDistributedConnection dc, Destination dest) throws
RemoteException, Exception;
- public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws
RemoteException, Exception;
public void transact(SpyDistributedConnection dc,
org.spydermq.TransactionRequest t) throws RemoteException, Exception;
+ public void acknowledge(SpyDistributedConnection dc, AcknowledgementRequest
item) throws RemoteException, Exception;
+ public SpyMessage[] browse(SpyDistributedConnection dc, Destination dest,
String selector) throws RemoteException, Exception;
+ public void listenerChange(SpyDistributedConnection dc, int subscriberId,
boolean state) throws RemoteException, Exception;
+ public SpyMessage receive(SpyDistributedConnection dc, int subscriberId, long
wait) throws RemoteException, Exception;
+ public void setEnabled(SpyDistributedConnection dc, boolean enabled) throws
RemoteException, Exception;
+ public void subscribe(SpyDistributedConnection dc, org.spydermq.Subscription
s) throws RemoteException, Exception;
+ public void unsubscribe(SpyDistributedConnection dc, int subscriptionId )
throws RemoteException, Exception;
}
1.7 +33 -29
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java
Index: DistributedJMSServerRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- DistributedJMSServerRMIImpl.java 2000/12/19 06:43:37 1.6
+++ DistributedJMSServerRMIImpl.java 2000/12/23 15:48:21 1.7
@@ -22,9 +22,9 @@
import org.spydermq.Log;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
-import org.spydermq.SpyAcknowledgementItem;
import org.spydermq.server.JMSServer;
import org.spydermq.TransactionRequest;
+import org.spydermq.AcknowledgementRequest;
/**
* The RMI implementation of the DistributedJMSServer object
@@ -32,7 +32,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class DistributedJMSServerRMIImpl
extends UnicastRemoteObject
@@ -63,17 +63,17 @@
public Topic createTopic(SpyDistributedConnection dc, String dest) throws
JMSException
{
- return server.createTopic(dest);
+ return server.createTopic(dc,dest);
}
public Queue createQueue(SpyDistributedConnection dc, String dest) throws
JMSException
{
- return server.createQueue(dest);
+ return server.createQueue(dc,dest);
}
public void deleteTemporaryDestination(SpyDistributedConnection dc,
SpyDestination dest) throws JMSException
{
- server.deleteTemporaryDestination(dest);
+ server.deleteTemporaryDestination(dc,dest);
}
public void checkID(String ID) throws JMSException
@@ -98,18 +98,9 @@
// to access the same remote object via RMI
}
- public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item) throws JMSException, RemoteException{
- server.acknowledge(dc, item);
- }
-
public void connectionClosing(SpyDistributedConnection dc) throws JMSException
- {
- server.connectionClosing(dc,null);
- }
-
- public void connectionListening(SpyDistributedConnection dc,boolean
mode,Destination dest) throws JMSException
{
- server.connectionListening(dc,mode,dest);
+ server.connectionClosing(dc);
}
public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException
@@ -122,22 +113,35 @@
return server.getTemporaryTopic(dc);
}
- public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long
wait) throws JMSException
- {
- return server.queueReceive(dc,queue, wait);
- }
-
- public void subscribe(SpyDistributedConnection dc, Destination dest) throws
JMSException
- {
- server.subscribe(dc,dest);
- }
-
- public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws
JMSException
- {
- server.unsubscribe(dc,dest);
- }
-
public void transact(org.spydermq.SpyDistributedConnection dc,
TransactionRequest t) throws JMSException {
server.transact(dc,t);
+ }
+
+ public void acknowledge(SpyDistributedConnection dc, AcknowledgementRequest
item) throws Exception {
+ server.acknowledge(dc, item);
+ }
+
+ public SpyMessage[] browse(SpyDistributedConnection dc, Destination dest,
String selector) throws Exception {
+ return server.browse(dc, dest, selector);
+ }
+
+ public void listenerChange(SpyDistributedConnection dc, int subscriberId,
boolean state) throws Exception {
+ server.listenerChange(dc,subscriberId,state);
+ }
+
+ public SpyMessage receive(SpyDistributedConnection dc, int subscriberId, long
wait) throws Exception {
+ return server.receive(dc,subscriberId,wait);
+ }
+
+ public void setEnabled(SpyDistributedConnection dc, boolean enabled) throws
Exception {
+ server.setEnabled( dc, enabled);
+ }
+
+ public void subscribe(SpyDistributedConnection dc, org.spydermq.Subscription
s) throws Exception {
+ server.subscribe(dc,s);
+ }
+
+ public void unsubscribe(SpyDistributedConnection dc, int subscriptionId )
throws Exception{
+ server.unsubscribe(dc, subscriptionId);
}
}
1.7 +70 -65
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java
Index: DistributedJMSServerUIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- DistributedJMSServerUIL.java 2000/12/19 06:43:37 1.6
+++ DistributedJMSServerUIL.java 2000/12/23 15:48:21 1.7
@@ -17,12 +17,13 @@
import org.spydermq.SpyDestination;
import org.spydermq.SpyDistributedConnection;
import org.spydermq.Log;
-import org.spydermq.SpyAcknowledgementItem;
import org.spydermq.server.JMSServer;
-
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
import org.spydermq.multiplexor.SocketMultiplexor;
+import org.spydermq.TransactionRequest;
+import org.spydermq.AcknowledgementRequest;
+import org.spydermq.Subscription;
import java.rmi.RemoteException;
import java.net.ServerSocket;
@@ -34,24 +35,40 @@
import java.io.BufferedInputStream;
import java.io.IOException;
-import org.spydermq.TransactionRequest;
-
/**
* The UIL implementation of the DistributedJMSServer object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class DistributedJMSServerUIL
implements Runnable, DistributedJMSServerSetup, DistributedJMSServerUILMBean
{
-
- // Attributes ----------------------------------------------------
+ static final int m_acknowledge = 1;
+ static final int m_addMessage = 2;
+ static final int m_browse = 3;
+ static final int m_checkID = 4;
+ static final int m_connectionClosing = 5;
+ static final int m_createQueue = 6;
+ static final int m_createTopic = 7;
+ static final int m_deleteTemporaryDestination = 8;
+ static final int m_getID = 9;
+ static final int m_getTemporaryQueue = 10;
+ static final int m_getTemporaryTopic = 11;
+ static final int m_listenerChange = 12;
+ static final int m_receive = 13;
+ static final int m_setEnabled = 14;
+ static final int m_setSpyDistributedConnection = 15;
+ static final int m_subscribe = 16;
+ static final int m_transact = 17;
+ static final int m_unsubscribe = 18;
//The server implementation
protected static JMSServer server;
+
+ protected ServerSocket serverSocket;
// Constructor ---------------------------------------------------
@@ -60,27 +77,6 @@
exportObject();
}
- // Internals -----------------------------------------------------
-
- static final int GetID = 1;
- static final int NewMessage = 2;
- static final int Subscribe = 3;
- static final int Unsubscribe = 4;
- static final int CreateTopic = 5;
- static final int CreateQueue = 6;
- static final int GetTemporaryTopic = 7;
- static final int GetTemporaryQueue = 8;
- static final int ConnectionClosing = 9;
- static final int DeleteTemporaryDestination = 10;
- static final int CheckID = 11;
- static final int QueueReceive = 12;
- static final int ConnectionListening = 13;
- static final int Acknowledge = 14;
- static final int SetSpyDistributedConnection = 15;
- static final int Transact = 16;
-
- protected ServerSocket serverSocket;
-
void exportObject()
{
try {
@@ -134,59 +130,65 @@
switch (code)
{
- case GetID:
- result=server.getID();
+ case m_setSpyDistributedConnection:
+ spyDistributedConnection =
(SpyDistributedConnection)in.readObject();
+ if( spyDistributedConnection.cr
instanceof ConnectionReceiverUILClient ) {
+
((ConnectionReceiverUILClient)spyDistributedConnection.cr).mSocket = mSocket;
+
((ConnectionReceiverUILClient)spyDistributedConnection.cr).createConnection();
+ }
break;
- case NewMessage:
+ case m_acknowledge:
+
server.acknowledge(spyDistributedConnection, (AcknowledgementRequest)in.readObject());
+ break;
+ case m_addMessage:
server.addMessage(spyDistributedConnection, (SpyMessage)in.readObject());
break;
- case Subscribe:
-
server.subscribe(spyDistributedConnection, (Destination)in.readObject());
+ case m_browse:
+
result=server.browse(spyDistributedConnection, (Destination)in.readObject(),
(String)in.readObject());
break;
- case Unsubscribe:
-
server.unsubscribe(spyDistributedConnection,(Destination)in.readObject());
+ case m_checkID:
+
server.checkID((String)in.readObject());
break;
- case CreateTopic:
-
result=(Topic)server.createTopic((String)in.readObject());
+ case m_connectionClosing:
+
server.connectionClosing(spyDistributedConnection);
+ closed = true;
break;
- case CreateQueue:
-
result=(Queue)server.createQueue((String)in.readObject());
+ case this.m_createQueue:
+
result=(Queue)server.createQueue(spyDistributedConnection, (String)in.readObject());
break;
- case GetTemporaryTopic:
-
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
+ case m_createTopic:
+
result=(Topic)server.createTopic(spyDistributedConnection, (String)in.readObject());
break;
- case GetTemporaryQueue:
-
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
+ case m_deleteTemporaryDestination:
+
server.deleteTemporaryDestination(spyDistributedConnection,
(SpyDestination)in.readObject());
break;
- case ConnectionClosing:
-
server.connectionClosing(spyDistributedConnection,null);
- closed = true;
+ case this.m_getID:
+ result=server.getID();
break;
- case DeleteTemporaryDestination:
-
server.deleteTemporaryDestination((SpyDestination)in.readObject());
+ case m_getTemporaryQueue:
+
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
break;
- case CheckID:
-
server.checkID((String)in.readObject());
+ case m_getTemporaryTopic:
+
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
break;
- case QueueReceive:
-
result=server.queueReceive(spyDistributedConnection,(Queue)in.readObject(),
in.readLong());
+ case m_listenerChange:
+
server.listenerChange(spyDistributedConnection,in.readInt(),in.readBoolean());
break;
- case ConnectionListening:
-
server.connectionListening(spyDistributedConnection,in.readBoolean(),(Destination)in.readObject());
+ case m_receive:
+
result=server.receive(spyDistributedConnection,in.readInt(), in.readLong());
break;
- case Acknowledge:
-
server.acknowledge(spyDistributedConnection, (SpyAcknowledgementItem)in.readObject());
+ case m_setEnabled :
+
server.setEnabled(spyDistributedConnection, in.readBoolean());
break;
- case SetSpyDistributedConnection:
- spyDistributedConnection =
(SpyDistributedConnection)in.readObject();
- if( spyDistributedConnection.cr
instanceof ConnectionReceiverUILClient ) {
-
((ConnectionReceiverUILClient)spyDistributedConnection.cr).mSocket = mSocket;
-
((ConnectionReceiverUILClient)spyDistributedConnection.cr).createConnection();
- }
+ case m_subscribe:
+
server.subscribe(spyDistributedConnection, (Subscription)in.readObject());
break;
- case Transact:
+ case m_transact:
server.transact(spyDistributedConnection, (TransactionRequest)in.readObject());
break;
+ case m_unsubscribe:
+
server.unsubscribe(spyDistributedConnection,in.readInt());
+ break;
default:
throw new RemoteException("Bad method
code !");
}
@@ -227,9 +229,12 @@
try {
- if( !closed )
-
server.connectionClosing(spyDistributedConnection,null);
-
+ if( !closed ) {
+ try {
+ server.connectionClosing(spyDistributedConnection);
+ } catch ( JMSException ignore ) {
+ }
+ }
mSocket.close();
} catch ( IOException e ) {
Log.log("Could not gracefully close the connection to the
server.");
1.6 +68 -43
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java
Index: DistributedJMSServerUILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- DistributedJMSServerUILClient.java 2000/12/19 06:43:37 1.5
+++ DistributedJMSServerUILClient.java 2000/12/23 15:48:21 1.6
@@ -17,8 +17,8 @@
import org.spydermq.SpyDestination;
import org.spydermq.Log;
import org.spydermq.SpyDistributedConnection;
-import org.spydermq.SpyAcknowledgementItem;
+
import org.spydermq.server.JMSServer;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.multiplexor.SocketMultiplexor;
@@ -34,6 +34,7 @@
import java.net.InetAddress;
import org.spydermq.TransactionRequest;
+import org.spydermq.AcknowledgementRequest;
/**
*The UIL implementation of the DistributedJMSServer object
@@ -41,26 +42,28 @@
*@author Norbert Lataille ([EMAIL PROTECTED])
*@author Hiram Chirino ([EMAIL PROTECTED])
*
- *@version $Revision: 1.5 $
+ *@version $Revision: 1.6 $
*/
public class DistributedJMSServerUILClient implements DistributedJMSServer,
Serializable {
- static final int GetID = 1;
- static final int NewMessage = 2;
- static final int Subscribe = 3;
- static final int Unsubscribe = 4;
- static final int CreateTopic = 5;
- static final int CreateQueue = 6;
- static final int GetTemporaryTopic = 7;
- static final int GetTemporaryQueue = 8;
- static final int ConnectionClouting = 9;
- static final int DeleteTemporaryDestination = 10;
- static final int CheckID = 11;
- static final int QueueReceive = 12;
- static final int ConnectionListening = 13;
- static final int Acknowledge = 14;
- static final int SetSpyDistributedConnection = 15;
- static final int Transact = 16;
+ static final int m_acknowledge = 1;
+ static final int m_addMessage = 2;
+ static final int m_browse = 3;
+ static final int m_checkID = 4;
+ static final int m_connectionClosing = 5;
+ static final int m_createQueue = 6;
+ static final int m_createTopic = 7;
+ static final int m_deleteTemporaryDestination = 8;
+ static final int m_getID = 9;
+ static final int m_getTemporaryQueue = 10;
+ static final int m_getTemporaryTopic = 11;
+ static final int m_listenerChange = 12;
+ static final int m_receive = 13;
+ static final int m_setEnabled = 14;
+ static final int m_setSpyDistributedConnection = 15;
+ static final int m_subscribe = 16;
+ static final int m_transact = 17;
+ static final int m_unsubscribe = 18;
//Remote stuff
protected int port;
@@ -117,7 +120,7 @@
public String getID() throws Exception {
checkConnection();
try {
- out.writeByte(GetID);
+ out.writeByte(m_getID);
} catch (IOException e) {
failure(e);
}
@@ -127,7 +130,7 @@
public void checkID(String ID) throws JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(CheckID);
+ out.writeByte(m_checkID);
out.writeObject(ID);
} catch (IOException e) {
failure(e);
@@ -138,7 +141,7 @@
public void setSpyDistributedConnection(SpyDistributedConnection dest) throws
RemoteException {
checkConnection();
try {
- out.writeByte(SetSpyDistributedConnection);
+ out.writeByte(m_setSpyDistributedConnection);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
@@ -149,7 +152,7 @@
public void connectionClosing(SpyDistributedConnection dc) throws
JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(ConnectionClouting);
+ out.writeByte(m_connectionClosing);
} catch (IOException e) {
failure(e);
}
@@ -159,7 +162,7 @@
public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(GetTemporaryQueue);
+ out.writeByte(m_getTemporaryQueue);
} catch (IOException e) {
failure(e);
}
@@ -169,17 +172,17 @@
public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(GetTemporaryTopic);
+ out.writeByte(m_getTemporaryTopic);
} catch (IOException e) {
failure(e);
}
return (TemporaryTopic) waitAnswer();
}
- public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item) throws JMSException, RemoteException {
+ public void acknowledge(SpyDistributedConnection dc, AcknowledgementRequest
item) throws JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(Acknowledge);
+ out.writeByte(m_acknowledge);
out.writeObject(item);
} catch (IOException e) {
failure(e);
@@ -190,7 +193,7 @@
public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws
JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(NewMessage);
+ out.writeByte(m_addMessage);
out.writeObject(val);
} catch (IOException e) {
failure(e);
@@ -203,12 +206,12 @@
createConnection();
}
- public void connectionListening(SpyDistributedConnection dc, boolean mode,
Destination dest) throws Exception, RemoteException {
+ public void listenerChange(SpyDistributedConnection dc, int subscriberId,
boolean state) throws Exception, RemoteException {
checkConnection();
try {
- out.writeByte(ConnectionListening);
- out.writeBoolean(mode);
- out.writeObject(dest);
+ out.writeByte(m_listenerChange);
+ out.writeInt(subscriberId);
+ out.writeBoolean(state);
} catch (IOException e) {
failure(e);
}
@@ -218,7 +221,7 @@
public Queue createQueue(SpyDistributedConnection dc, String dest) throws
JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(CreateQueue);
+ out.writeByte(m_createQueue);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
@@ -229,7 +232,7 @@
public Topic createTopic(SpyDistributedConnection dc, String dest) throws
JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(CreateTopic);
+ out.writeByte(m_createTopic);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
@@ -240,7 +243,7 @@
public void deleteTemporaryDestination(SpyDistributedConnection dc,
SpyDestination dest) throws JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(DeleteTemporaryDestination);
+ out.writeByte(m_deleteTemporaryDestination);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
@@ -248,11 +251,11 @@
waitAnswer();
}
- public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long
wait) throws Exception, RemoteException {
+ public SpyMessage receive(SpyDistributedConnection dc, int subscriberId, long
wait) throws Exception, RemoteException {
checkConnection();
try {
- out.writeByte(QueueReceive);
- out.writeObject(queue);
+ out.writeByte(m_receive);
+ out.writeInt(subscriberId);
out.writeLong(wait);
} catch (IOException e) {
failure(e);
@@ -260,11 +263,11 @@
return (SpyMessage) waitAnswer();
}
- public void subscribe(SpyDistributedConnection dc, Destination dest) throws
JMSException, RemoteException {
+ public void subscribe(SpyDistributedConnection dc, org.spydermq.Subscription
s) throws JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(Subscribe);
- out.writeObject(dest);
+ out.writeByte(m_subscribe);
+ out.writeObject(s);
} catch (IOException e) {
failure(e);
}
@@ -272,22 +275,44 @@
}
+ public void unsubscribe(SpyDistributedConnection dc, int subscriptionId )
throws JMSException, RemoteException {
+ checkConnection();
+ try {
+ out.writeByte(m_unsubscribe);
+ out.writeInt(subscriptionId);
+ } catch (IOException e) {
+ failure(e);
+ }
+ waitAnswer();
+ }
- public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws
JMSException, RemoteException {
+ public void setEnabled(SpyDistributedConnection dc, boolean enabled) throws
JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(Unsubscribe);
- out.writeObject(dest);
+ out.writeByte(m_setEnabled);
+ out.writeBoolean(enabled);
} catch (IOException e) {
failure(e);
}
waitAnswer();
}
+ public SpyMessage[] browse(SpyDistributedConnection dc, Destination dest,
String selector) throws JMSException, RemoteException {
+ checkConnection();
+ try {
+ out.writeByte(m_browse);
+ out.writeObject(dest);
+ out.writeObject(selector);
+ } catch (IOException e) {
+ failure(e);
+ }
+ return (SpyMessage[]) waitAnswer();
+ }
+
public void transact(org.spydermq.SpyDistributedConnection dc,
TransactionRequest t) throws JMSException, RemoteException {
checkConnection();
try {
- out.writeByte(Transact);
+ out.writeByte(m_transact);
out.writeObject(t);
} catch (IOException e) {
failure(e);