User: pkendall
Date: 01/08/08 17:28:42
Modified: src/main/org/jbossmq/il/uil UILClientIL.java
UILClientILService.java UILServerIL.java
UILServerILService.java
Log:
Major updates (especially to topics).
Speed improvements.
Make JVM IL work (by using a singleton JMSServer).
Message Listeners re-implemented using client-side thread.
Revision Changes Path
1.3 +9 -7 jbossmq/src/main/org/jbossmq/il/uil/UILClientIL.java
Index: UILClientIL.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/uil/UILClientIL.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- UILClientIL.java 2001/07/16 02:51:46 1.2
+++ UILClientIL.java 2001/08/09 00:28:42 1.3
@@ -24,11 +24,11 @@
/**
* The RMI implementation of the ConnectionReceiver object
- *
+ *
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
- *
- * @version $Revision: 1.2 $
+ *
+ * @version $Revision: 1.3 $
*/
public class UILClientIL implements ClientIL, java.io.Serializable {
@@ -53,7 +53,9 @@
checkSocket();
cat.debug("Writing request");
out.writeByte(m_receive);
- out.writeObject(messages);
+ out.writeInt(messages.length);
+ for(int i=0;i<messages.length;++i)
+ messages[i].writeExternal(out);
cat.debug("Waiting for awnser");
waitAnswer();
cat.debug("Done");
@@ -78,7 +80,7 @@
in=new ObjectInputStream(new
BufferedInputStream(mSocket.getInputStream(2)));
out=new ObjectOutputStream(new
BufferedOutputStream(mSocket.getOutputStream(2)));
out.flush();
- } catch (Exception e) {
+ } catch (Exception e) {
throw new RemoteException("Cannot connect to the
ConnectionReceiver/Server");
}
}
@@ -91,15 +93,15 @@
out.flush();
int val=in.readByte();
switch(val) {
- case 1:
+ case 1:
Exception e=(Exception)in.readObject();
throwException = new RemoteException("", e);
break;
}
- } catch (IOException e) {
+ } catch (IOException e) {
throw new RemoteException("Cannot contact the remote
object",e);
}
-
+
if( throwException != null )
throw throwException;
}
1.4 +14 -8 jbossmq/src/main/org/jbossmq/il/uil/UILClientILService.java
Index: UILClientILService.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/uil/UILClientILService.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- UILClientILService.java 2001/08/01 01:18:22 1.3
+++ UILClientILService.java 2001/08/09 00:28:42 1.4
@@ -25,11 +25,11 @@
/**
* The RMI implementation of the ConnectionReceiver object
- *
+ *
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class UILClientILService implements org.jbossmq.il.ClientILService, Runnable
{
//the client IL
@@ -54,16 +54,16 @@
* start method comment.
*/
public void start() throws java.lang.Exception {
-
+
running = true;
worker = new Thread(connection.threadGroup, this,
"UILClientILService");
worker.setDaemon(true);
- worker.start();
-
+ worker.start();
+
}
/**
- *
+ *
*/
public void stop() throws java.lang.Exception {
running = false;
@@ -84,7 +84,7 @@
public void run() {
cat.debug("UILClientILService.run()");
-
+
Socket socket = null;
int code = 0;
@@ -120,7 +120,13 @@
switch (code) {
case m_receive:
-
connection.asynchDeliver((org.jbossmq.ReceiveRequest[])in.readObject());
+ int numReceives = in.readInt();
+ org.jbossmq.ReceiveRequest [] messages = new
org.jbossmq.ReceiveRequest[numReceives];
+ for(int i=0;i<numReceives;++i){
+ messages[i] = new ReceiveRequest();
+ messages[i].readExternal(in);
+ }
+ connection.asynchDeliver(messages);
break;
case m_deleteTemporaryDestination:
connection.asynchDeleteTemporaryDestination((SpyDestination) in.readObject());
1.3 +19 -18 jbossmq/src/main/org/jbossmq/il/uil/UILServerIL.java
Index: UILServerIL.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/uil/UILServerIL.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- UILServerIL.java 2001/07/16 02:51:46 1.2
+++ UILServerIL.java 2001/08/09 00:28:42 1.3
@@ -20,6 +20,7 @@
import org.jbossmq.ConnectionToken;
import org.jbossmq.il.ServerIL;
import org.jbossmq.server.JMSServer;
+import org.jbossmq.DurableSubcriptionID;
import org.jbossmq.il.uil.multiplexor.SocketMultiplexor;
import java.io.ObjectInputStream;
@@ -32,11 +33,11 @@
/**
* The JVM implementation of the ServerIL object
- *
+ *
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
- *
- * @version $Revision: 1.2 $
+ *
+ * @version $Revision: 1.3 $
*/
public class UILServerIL implements ServerIL, java.io.Serializable, Cloneable {
@@ -49,7 +50,7 @@
public ServerIL cloneServerIL() throws Exception {
return (ServerIL)clone();
}
-
+
synchronized public void setConnectionToken(ConnectionToken dest) throws
Exception {
checkConnection();
out.writeByte(m_setSpyDistributedConnection);
@@ -66,7 +67,7 @@
synchronized public void addMessage(ConnectionToken dc, SpyMessage val) throws
Exception {
checkConnection();
out.writeByte(m_addMessage);
- out.writeObject(val);
+ SpyMessage.writeMessage(val,out);
waitAnswer();
}
@@ -119,7 +120,7 @@
synchronized public void acknowledge(ConnectionToken dc,
AcknowledgementRequest item) throws JMSException, Exception {
checkConnection();
out.writeByte(m_acknowledge);
- out.writeObject(item);
+ item.writeExternal(out);
waitAnswer();
}
@@ -131,14 +132,6 @@
return (SpyMessage[]) waitAnswer();
}
- synchronized public void listenerChange(ConnectionToken dc, int subscriberId,
boolean state) throws Exception, Exception {
- checkConnection();
- out.writeByte(m_listenerChange);
- out.writeInt(subscriberId);
- out.writeBoolean(state);
- waitAnswer();
- }
-
synchronized public SpyMessage receive(ConnectionToken dc, int subscriberId,
long wait) throws Exception, Exception {
checkConnection();
out.writeByte(m_receive);
@@ -161,12 +154,19 @@
waitAnswer();
}
+ synchronized public void destroySubscription(DurableSubcriptionID id) throws
JMSException, Exception {
+ checkConnection();
+ out.writeByte(m_destroySubscription);
+ out.writeObject(id);
+ waitAnswer();
+ }
+
synchronized public String checkUser(String userName, String password) throws
JMSException, Exception {
checkConnection();
out.writeByte(m_checkUser);
out.writeObject(userName);
out.writeObject(password);
- return (String)waitAnswer();
+ return (String)waitAnswer();
}
synchronized public void subscribe(ConnectionToken dc,
org.jbossmq.Subscription s) throws JMSException, Exception {
@@ -179,7 +179,7 @@
synchronized public void transact(org.jbossmq.ConnectionToken dc,
TransactionRequest t) throws JMSException, Exception {
checkConnection();
out.writeByte(m_transact);
- out.writeObject(t);
+ t.writeExternal(out);
waitAnswer();
}
@@ -189,7 +189,6 @@
static final int m_addMessage = 2;
static final int m_browse = 3;
static final int m_checkID = 4;
- static final int m_checkUser = 19;
static final int m_connectionClosing = 5;
static final int m_createQueue = 6;
static final int m_createTopic = 7;
@@ -197,13 +196,15 @@
static final int m_getID = 9;
static final int m_getTemporaryQueue = 10;
static final int m_getTemporaryTopic = 11;
- static final int m_listenerChange = 12;
static final int m_receive = 13;
static final int m_setEnabled = 14;
static final int m_setSpyDistributedConnection = 15;
static final int m_subscribe = 16;
static final int m_transact = 17;
static final int m_unsubscribe = 18;
+ static final int m_destroySubscription = 19;
+ static final int m_checkUser = 20;
+
protected transient SocketMultiplexor mSocket;
protected transient ObjectOutputStream out;
//Remote stuff
1.7 +50 -43 jbossmq/src/main/org/jbossmq/il/uil/UILServerILService.java
Index: UILServerILService.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/uil/UILServerILService.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- UILServerILService.java 2001/08/01 01:18:22 1.6
+++ UILServerILService.java 2001/08/09 00:28:42 1.7
@@ -16,6 +16,7 @@
import org.jbossmq.il.ServerILJMXService;
import org.jbossmq.il.ServerIL;
import org.jbossmq.server.JMSServer;
+import org.jbossmq.DurableSubcriptionID;
import java.net.Socket;
import javax.jms.JMSException;
@@ -42,14 +43,14 @@
import org.jbossmq.SpyDestination;
/**
- * Implements the ServerILJMXService which is used to
+ * Implements the ServerILJMXService which is used to
* manage the JVM IL.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class UILServerILService extends ServerILJMXService implements
UILServerILServiceMBean, Runnable {
-
+
UILServerIL serverIL;
/**
@@ -77,13 +78,13 @@
serverSocket = new ServerSocket(serverBindPort);
serverSocket.setSoTimeout(SO_TIMEOUT);
-
+
category.info("JBossMQ UIL service available at :
"+InetAddress.getLocalHost().getHostAddress()+ ":"+ serverSocket.getLocalPort());
worker = new Thread(server.threadGroup, this,"UIL Worker");
worker.start();
-
+
serverIL = new UILServerIL(InetAddress.getLocalHost(),
serverSocket.getLocalPort());
-
+
bindJNDIReferences();
}
@@ -115,7 +116,6 @@
static final int m_addMessage = 2;
static final int m_browse = 3;
static final int m_checkID = 4;
- static final int m_checkUser = 19;
static final int m_connectionClosing = 5;
static final int m_createQueue = 6;
static final int m_createTopic = 7;
@@ -130,6 +130,9 @@
static final int m_subscribe = 16;
static final int m_transact = 17;
static final int m_unsubscribe = 18;
+ static final int m_destroySubscription = 19;
+ static final int m_checkUser = 20;
+
boolean running;
//The server implementation
protected static JMSServer server;
@@ -147,9 +150,9 @@
ObjectInputStream in=null;
ConnectionToken connectionToken=null;
boolean closed = false;
-
+
try {
-
+
while( running && socket==null ) {
try {
socket = serverSocket.accept();
@@ -167,7 +170,7 @@
out = new ObjectOutputStream(new
BufferedOutputStream(mSocket.getOutputStream(1)));
out.flush();
-
+
in = new ObjectInputStream(new
BufferedInputStream(mSocket.getInputStream(1)));
} catch (IOException e) {
@@ -187,15 +190,15 @@
catch (IOException e) {
if( closed || !running )
break;
-
+
category.warn("Connection failure (1).", e);
break;
}
-
+
try {
-
- Object result=null;
-
+
+ Object result=null;
+
switch (code)
{
case m_setSpyDistributedConnection:
@@ -205,44 +208,43 @@
((UILClientIL)connectionToken.clientIL).createConnection();
category.debug("The UILClientIL Connection is
set up");
break;
-
+
case m_acknowledge:
- server.acknowledge(connectionToken,
(AcknowledgementRequest)in.readObject());
+ AcknowledgementRequest ack = new AcknowledgementRequest();
+ ack.readExternal(in);
+ server.acknowledge(connectionToken,
ack);
break;
- case m_addMessage:
- server.addMessage(connectionToken,
(SpyMessage)in.readObject());
+ case m_addMessage:
+ server.addMessage(connectionToken,
SpyMessage.readMessage(in));
break;
- case m_browse:
+ case m_browse:
result=server.browse(connectionToken,
(Destination)in.readObject(), (String)in.readObject());
break;
- case m_checkID:
+ case m_checkID:
server.checkID((String)in.readObject());
break;
- case m_connectionClosing:
+ case m_connectionClosing:
server.connectionClosing(connectionToken);
closed = true;
break;
- case m_createQueue:
+ case m_createQueue:
result=(Queue)server.createQueue(connectionToken, (String)in.readObject());
break;
- case m_createTopic:
+ case m_createTopic:
result=(Topic)server.createTopic(connectionToken, (String)in.readObject());
break;
- case m_deleteTemporaryDestination:
+ case m_deleteTemporaryDestination:
server.deleteTemporaryDestination(connectionToken, (SpyDestination)in.readObject());
break;
- case m_getID:
+ case m_getID:
result=server.getID();
break;
- case m_getTemporaryQueue:
+ case m_getTemporaryQueue:
result=(TemporaryQueue)server.getTemporaryQueue(connectionToken);
break;
case m_getTemporaryTopic:
result=(TemporaryTopic)server.getTemporaryTopic(connectionToken);
break;
- case m_listenerChange:
-
server.listenerChange(connectionToken,in.readInt(),in.readBoolean());
- break;
case m_receive:
result=server.receive(connectionToken,in.readInt(), in.readLong());
break;
@@ -253,22 +255,27 @@
server.subscribe(connectionToken,
(Subscription)in.readObject());
break;
case m_transact:
- server.transact(connectionToken,
(TransactionRequest)in.readObject());
+ TransactionRequest trans = new TransactionRequest();
+ trans.readExternal(in);
+ server.transact(connectionToken,
trans);
break;
- case m_unsubscribe:
+ case m_unsubscribe:
server.unsubscribe(connectionToken,in.readInt());
+ break;
+ case m_destroySubscription:
+
server.destroySubscription((DurableSubcriptionID)in.readObject());
break;
- case m_checkUser:
+ case m_checkUser:
result =
server.checkUser((String)in.readObject(), (String)in.readObject());
break;
default:
throw new RemoteException("Bad method
code !");
}
-
+
//Everthing was OK
-
+
try {
- if (result==null)
+ if (result==null)
out.writeByte(0);
else {
out.writeByte(1);
@@ -279,15 +286,15 @@
} catch (IOException e) {
if( closed )
break;
-
+
category.warn("Connection failure (2).", e);
- break;
+ break;
}
-
+
} catch (Exception e) {
if( closed )
break;
-
+
category.info("Client request resulted in a server
exception: ", e);
try {
@@ -298,10 +305,10 @@
} catch (IOException e2) {
if( closed )
break;
-
+
category.warn("Connection failure (3).", e);
- break;
- }
+ break;
+ }
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development