User: norbert
Date: 00/06/19 19:19:14
Modified: src/java/org/spydermq/distributed/server
ConnectionReceiverOIL.java
ConnectionReceiverOILClient.java
DistributedJMSServerOIL.java
DistributedJMSServerOILClient.java
Log:
Some optimizations to the OIL
Revision Changes Path
1.6 +59 -17
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
Index: ConnectionReceiverOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- ConnectionReceiverOIL.java 2000/06/19 21:52:00 1.5
+++ ConnectionReceiverOIL.java 2000/06/20 02:19:13 1.6
@@ -27,11 +27,12 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.InetAddress;
-import java.io.ObjectOutputStream;
-import java.io.BufferedOutputStream;
-import java.io.ObjectInputStream;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedInputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.IOException;
/**
@@ -39,7 +40,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class ConnectionReceiverOIL
implements Runnable, ConnectionReceiverSetup
@@ -82,8 +83,10 @@
{
Socket socket = null;
int code = 0;
- InputStream is=null;
- OutputStream os=null;
+ //InputStream is=null;
+ //OutputStream os=null;
+ BufferedInputStream is=null;
+ BufferedOutputStream os=null;
ObjectOutputStream out=null;
ObjectInputStream in=null;
@@ -92,10 +95,14 @@
//We have our connection to the broker... there's no need to
wait for another connection
//new Thread(this).start();
+
+ //is = socket.getInputStream();
+ //os = socket.getOutputStream();
+ is = new BufferedInputStream(socket.getInputStream());
+ os = new BufferedOutputStream(socket.getOutputStream());
- is = socket.getInputStream();
- os = socket.getOutputStream();
- out = new ObjectOutputStream(os);
+ out = new ObjectOutputStream(os);
+ out.flush();
in = new ObjectInputStream(is);
} catch (IOException e) {
@@ -106,13 +113,14 @@
while (true) {
try {
- code=is.read();
+ code=is.read();
} catch (IOException e) {
failure("Command read",e);
return;
}
try {
+
switch (code)
{
@@ -120,7 +128,9 @@
receive((SpyDestination)in.readObject(),(SpyMessage)in.readObject());
break;
case RECEIVE_MULTIPLE:
-
receiveMultiple((SpyDestination)in.readObject(),(SpyMessage[])in.readObject());
+ SpyDestination
dest=(SpyDestination)in.readObject();
+ int nb=in.readInt();
+ receiveMultiple(dest,nb,in);
break;
case DELETE_TEMPORARY_DESTINATION:
deleteTemporaryDestination((SpyDestination)in.readObject());
@@ -147,6 +157,7 @@
try {
os.write(1);
out.writeObject(e.getMessage());
+ out.flush();
os.flush();
} catch (IOException e2) {
failure("Result write",e2);
@@ -269,14 +280,45 @@
}
}
-
- public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
- {
- for(int i=0;i<mes.length;i++) {
- receive(dest,mes[i]);
+ public void receiveMultiple(SpyDestination dest,int nb,ObjectInputStream in)
throws Exception
+ {
+ if (closed) throw new IllegalStateException("The connection is
closed");
+
+ Log.log("ConnectionReceiver: ReceiveMultiple()");
+
+ if (connection instanceof SpyTopicConnection) {
+
+ //Get the set of subscribers for this Topic
+
+ ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
+ if (connectionQueue==null) return;
+
+ for(int val=0;val<nb;val++) {
+
+ SpyMessage mes=(SpyMessage)in.readObject();
+
+ //NL: i is a short-lived object. Try to "group"
messages in an pre-allocated/peristant
+ //array and apply the same iterator on this array
+ Iterator i=connectionQueue.subscribers.iterator();
+
+ while (i.hasNext()) {
+
+ SpySession session=(SpySession)i.next();
+
+ //add the new message to the session's queue
+ session.dispatchMessage(dest,mes);
+
+ //There is work to do...
+ session.mutex.notifyLock();
+ }
+ }
+
+ } else {
+ throw new Exception("Multiple dispatch for a Queue");
}
- }
+ }
+
public void close() throws Exception
{
1.7 +11 -6
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
Index: ConnectionReceiverOILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- ConnectionReceiverOILClient.java 2000/06/19 21:52:00 1.6
+++ ConnectionReceiverOILClient.java 2000/06/20 02:19:13 1.7
@@ -9,7 +9,9 @@
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import java.io.InputStream;
+import java.io.BufferedInputStream;
import java.io.OutputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.Socket;
@@ -24,8 +26,8 @@
static final int CLOSE = 4;
private transient Socket socket;
- private transient InputStream is;
- private transient OutputStream os;
+ private transient BufferedInputStream is;
+ private transient BufferedOutputStream os;
private transient ObjectOutputStream out;
private transient ObjectInputStream in;
@@ -43,11 +45,12 @@
{
try {
socket=new Socket(addr,port);
- is=socket.getInputStream();
- os=socket.getOutputStream();
+ is = new BufferedInputStream(socket.getInputStream());
+ os = new BufferedOutputStream(socket.getOutputStream());
in=new ObjectInputStream(is);
out=new ObjectOutputStream(os);
- } catch (Exception e) {
+ os.flush();
+ } catch (Exception e) {
Log.error(e);
throw new RemoteException("Cannot connect to the
ConnectionReceiver/Server");
}
@@ -85,7 +88,9 @@
if (socket==null) createConnection();
os.write(RECEIVE_MULTIPLE);
out.writeObject(dest);
- out.writeObject(mes);
+ out.writeInt(mes.length);
+ for(int i=0;i<mes.length;i++)
+ out.writeObject(mes[i]);
waitAnswer();
}
1.3 +31 -9
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java
Index: DistributedJMSServerOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- DistributedJMSServerOIL.java 2000/06/19 21:52:00 1.2
+++ DistributedJMSServerOIL.java 2000/06/20 02:19:13 1.3
@@ -7,6 +7,7 @@
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
import org.spydermq.SpyMessage;
+import org.spydermq.JMSServerQueue;
import org.spydermq.SpyDestination;
import org.spydermq.JMSServer;
import org.spydermq.SpyDistributedConnection;
@@ -20,8 +21,7 @@
import java.io.ObjectOutputStream;
import java.io.BufferedOutputStream;
import java.io.ObjectInputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.BufferedInputStream;
import java.io.IOException;
public class DistributedJMSServerOIL
@@ -72,8 +72,8 @@
{
Socket socket = null;
int code = 0;
- InputStream is=null;
- OutputStream os=null;
+ BufferedInputStream is=null;
+ BufferedOutputStream os=null;
ObjectOutputStream out=null;
ObjectInputStream in=null;
@@ -82,9 +82,10 @@
new Thread(this).start();
- is = socket.getInputStream();
- os = socket.getOutputStream();
- out = new ObjectOutputStream(os);
+ is = new BufferedInputStream(socket.getInputStream());
+ os = new BufferedOutputStream(socket.getOutputStream());
+ out = new ObjectOutputStream(os);
+ os.flush();
in = new ObjectInputStream(is);
} catch (IOException e) {
@@ -110,8 +111,8 @@
case GetID:
result=server.getID();
break;
- case NewMessage:
-
server.newMessage((SpyMessage[])in.readObject(),(String)in.readObject());
+ case NewMessage:
+
newMessage((String)in.readObject(),in.readInt(),in);
break;
case Subscribe:
server.subscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
@@ -184,6 +185,27 @@
{
Log.error("Closing socket: "+st);
Log.error(e);
+ }
+
+ void newMessage(String id,int nb,ObjectInputStream in) throws Exception
+ {
+ Log.notice("INCOMING: "+nb+" messages from "+id);
+
+ SpyDestination dest=null;
+ JMSServerQueue queue=null;
+
+ for(int i=0;i<nb;i++) {
+
+ SpyMessage mes=(SpyMessage)in.readObject();
+
+ if (dest==null||!dest.equals(mes.jmsDestination)) {
+
queue=(JMSServerQueue)server.messageQueue.get(mes.jmsDestination);
+ if (queue==null) throw new JMSException("This
destination does not exist !"); //hum...
+ }
+
+ //Add the message to the queue
+ queue.addMessage(mes);
+ }
}
// --
1.2 +19 -23
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java
Index: DistributedJMSServerOILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- DistributedJMSServerOILClient.java 2000/06/19 04:23:14 1.1
+++ DistributedJMSServerOILClient.java 2000/06/20 02:19:13 1.2
@@ -14,8 +14,8 @@
import java.rmi.RemoteException;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.Socket;
@@ -43,8 +43,8 @@
//Remote stuff
private transient Socket socket;
- private transient InputStream is;
- private transient OutputStream os;
+ private transient BufferedInputStream is;
+ private transient BufferedOutputStream os;
private transient ObjectOutputStream out;
private transient ObjectInputStream in;
@@ -62,10 +62,11 @@
{
try {
socket=new Socket(addr,port);
- is=socket.getInputStream();
- os=socket.getOutputStream();
+ is=new BufferedInputStream(socket.getInputStream());
+ os=new BufferedOutputStream(socket.getOutputStream());
in=new ObjectInputStream(is);
out=new ObjectOutputStream(os);
+ os.flush();
} catch (Exception e) {
failure(e);
}
@@ -98,34 +99,36 @@
//--- Remote Calls
- public String getID() throws Exception
+ public void newMessage(SpyMessage val[],String id) throws JMSException,
RemoteException
{
if (socket==null) createConnection();
try {
- os.write(GetID);
+ os.write(NewMessage);
+ out.writeObject(id);
+ out.writeInt(val.length);
+ for(int i=0;i<val.length;i++)
+ out.writeObject(val[i]);
} catch (IOException e) {
failure(e);
}
- return (String)waitAnswer();
+ waitAnswer();
}
-
- public void newMessage(SpyMessage val[],String id) throws JMSException,
RemoteException
+
+ public String getID() throws Exception
{
if (socket==null) createConnection();
try {
- os.write(NewMessage);
- out.writeObject(val);
- out.writeObject(id);
+ os.write(GetID);
} catch (IOException e) {
failure(e);
}
- waitAnswer();
+ return (String)waitAnswer();
}
-
+
public void subscribe(Destination dest,SpyDistributedConnection who) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
@@ -283,13 +286,6 @@
}
waitAnswer();
- }
-
- //--
-
- public void setServer(JMSServer s) throws Exception
- {
- //Nothing !
}
}