User: pkendall
Date: 01/08/08 17:27:21
Modified: src/main/org/jbossmq/il/oil OILClientIL.java
OILClientILService.java OILServerIL.java
OILServerILService.java
Log:
Major updates (especially to topics).
Speed improvements.
Make JVM IL work (by using a singleton JMSServer).
Message Listeners re-implemented using client-side thread.
Revision Changes Path
1.3 +10 -12 jbossmq/src/main/org/jbossmq/il/oil/OILClientIL.java
Index: OILClientIL.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/oil/OILClientIL.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- OILClientIL.java 2001/07/16 02:51:45 1.2
+++ OILClientIL.java 2001/08/09 00:27:21 1.3
@@ -25,18 +25,14 @@
/**
* The RMI implementation of the ConnectionReceiver object
- *
+ *
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
- *
- * @version $Revision: 1.2 $
+ *
+ * @version $Revision: 1.3 $
*/
public class OILClientIL implements ClientIL, java.io.Serializable {
-
-
-
-
public void close() throws Exception {
checkSocket();
out.writeByte(m_close);
@@ -57,7 +53,9 @@
checkSocket();
cat.debug("Writing request");
out.writeByte(m_receive);
- out.writeObject(messages);
+ out.writeInt(messages.length);
+ for(int i=0;i<messages.length;++i)
+ messages[i].writeExternal(out);
cat.debug("Waiting for awnser");
waitAnswer();
cat.debug("Done");
@@ -84,13 +82,13 @@
protected void createConnection() throws RemoteException
{
- try {
+ try {
cat.info("ConnectionReceiverOILClient is connecting to:
"+addr.getHostAddress()+":"+port);
socket=new Socket(addr,port);
out=new ObjectOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
out.flush();
in=new ObjectInputStream(new
BufferedInputStream(socket.getInputStream()));
- } catch (Exception e) {
+ } catch (Exception e) {
cat.debug(e);
throw new RemoteException("Cannot connect to the
ConnectionReceiver/Server");
}
@@ -104,15 +102,15 @@
out.flush();
int val=in.readByte();
switch(val) {
- case 1:
+ case 1:
Exception e=(Exception)in.readObject();
throwException = new RemoteException("", e);
break;
}
- } catch (IOException e) {
+ } catch (IOException e) {
throw new RemoteException("Cannot contact the remote
object",e);
}
-
+
if( throwException != null )
throw throwException;
}
1.4 +36 -15 jbossmq/src/main/org/jbossmq/il/oil/OILClientILService.java
Index: OILClientILService.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/oil/OILClientILService.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- OILClientILService.java 2001/08/01 01:18:32 1.3
+++ OILClientILService.java 2001/08/09 00:27:21 1.4
@@ -26,11 +26,11 @@
/**
* The RMI implementation of the ConnectionReceiver object
- *
+ *
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class OILClientILService implements org.jbossmq.il.ClientILService, Runnable
{
//the client IL
@@ -56,21 +56,24 @@
* start method comment.
*/
public void start() throws java.lang.Exception {
-
+
running = true;
worker = new Thread(connection.threadGroup, this,
"OILClientILService");
worker.setDaemon(true);
- worker.start();
-
+ worker.start();
+
}
/**
- *
+ *
*/
public void stop() throws java.lang.Exception {
running = false;
worker.interrupt();
- socket.close();
+ if(serverSocket != null)
+ serverSocket.close();
+ if(socket != null)
+ socket.close();
}
static org.apache.log4j.Category cat =
org.apache.log4j.Category.getInstance(OILClientILService.class);
@@ -98,16 +101,19 @@
cat.debug("Waiting for the server to connect to me");
// We may close() before we get a connection so we need to
// periodicaly check to see if we were !running.
- serverSocket.setSoTimeout(1000);
+ serverSocket.setSoTimeout(1000);
while( socket==null ) {
- try {
+ try {
socket = serverSocket.accept();
} catch (java.io.InterruptedIOException e) {
- }
+ } catch (IOException e){
+ if(running)
+ connection.asynchFailure("Error accepting connection from server in
OILClientILService.", e);
+ return;
+ }
if( !running )
return;
}
- serverSocket.close();
socket.setSoTimeout(0);
out = new ObjectOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
out.flush();
@@ -116,7 +122,13 @@
} catch (IOException e) {
connection.asynchFailure("Could not initialize the OILClientIL
Service.", e);
return;
- }
+ } finally {
+ try{
+ serverSocket.close();
+ serverSocket = null;
+ }catch(IOException e){
+ }
+ }
while (running) {
@@ -133,7 +145,13 @@
switch (code) {
case m_receive:
-
connection.asynchDeliver((org.jbossmq.ReceiveRequest[])in.readObject());
+ int numReceives = in.readInt();
+ org.jbossmq.ReceiveRequest [] messages = new
org.jbossmq.ReceiveRequest[numReceives];
+ for(int i=0;i<numReceives;++i){
+ messages[i] = new ReceiveRequest();
+ messages[i].readExternal(in);
+ }
+ connection.asynchDeliver(messages);
break;
case m_deleteTemporaryDestination:
connection.asynchDeleteTemporaryDestination((SpyDestination) in.readObject());
@@ -154,8 +172,8 @@
connection.asynchFailure("Connection failure",
e);
return;
}
+
-
} catch (Exception e) {
if (!running)
break;
@@ -174,11 +192,14 @@
}
}
-
+
try {
cat.debug("Closing receiver connections.");
out.close();
in.close();
+ if(socket != null);
+ socket.close();
+ socket = null;
} catch (IOException e) {
connection.asynchFailure("Connection failure", e);
return;
1.3 +18 -18 jbossmq/src/main/org/jbossmq/il/oil/OILServerIL.java
Index: OILServerIL.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/oil/OILServerIL.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- OILServerIL.java 2001/07/16 02:51:45 1.2
+++ OILServerIL.java 2001/08/09 00:27:21 1.3
@@ -19,6 +19,7 @@
import org.jbossmq.SpyDestination;
import org.jbossmq.ConnectionToken;
import org.jbossmq.il.ServerIL;
+import org.jbossmq.DurableSubcriptionID;
import org.jbossmq.server.JMSServer;
import org.jbossmq.il.uil.multiplexor.SocketMultiplexor;
@@ -32,11 +33,11 @@
/**
* The JVM implementation of the ServerIL object
- *
+ *
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
- *
- * @version $Revision: 1.2 $
+ *
+ * @version $Revision: 1.3 $
*/
public class OILServerIL implements ServerIL, java.io.Serializable, Cloneable {
@@ -49,7 +50,7 @@
public ServerIL cloneServerIL() throws Exception {
return (ServerIL)clone();
}
-
+
synchronized public void setConnectionToken(ConnectionToken dest) throws
Exception {
checkConnection();
out.writeByte(m_setSpyDistributedConnection);
@@ -66,7 +67,7 @@
synchronized public void addMessage(ConnectionToken dc, SpyMessage val) throws
Exception {
checkConnection();
out.writeByte(m_addMessage);
- out.writeObject(val);
+ SpyMessage.writeMessage(val,out);
waitAnswer();
}
@@ -119,7 +120,7 @@
synchronized public void acknowledge(ConnectionToken dc,
AcknowledgementRequest item) throws JMSException, Exception {
checkConnection();
out.writeByte(m_acknowledge);
- out.writeObject(item);
+ item.writeExternal(out);
waitAnswer();
}
@@ -131,14 +132,6 @@
return (SpyMessage[]) waitAnswer();
}
- synchronized public void listenerChange(ConnectionToken dc, int subscriberId,
boolean state) throws Exception, Exception {
- checkConnection();
- out.writeByte(m_listenerChange);
- out.writeInt(subscriberId);
- out.writeBoolean(state);
- waitAnswer();
- }
-
synchronized public SpyMessage receive(ConnectionToken dc, int subscriberId,
long wait) throws Exception, Exception {
checkConnection();
out.writeByte(m_receive);
@@ -161,12 +154,19 @@
waitAnswer();
}
+ synchronized public void destroySubscription(DurableSubcriptionID id) throws
JMSException, Exception {
+ checkConnection();
+ out.writeByte(m_destroySubscription);
+ out.writeObject(id);
+ waitAnswer();
+ }
+
synchronized public String checkUser(String userName, String password) throws
JMSException, Exception {
checkConnection();
out.writeByte(m_checkUser);
out.writeObject(userName);
out.writeObject(password);
- return (String)waitAnswer();
+ return (String)waitAnswer();
}
synchronized public void subscribe(ConnectionToken dc,
org.jbossmq.Subscription s) throws JMSException, Exception {
@@ -179,7 +179,7 @@
synchronized public void transact(org.jbossmq.ConnectionToken dc,
TransactionRequest t) throws JMSException, Exception {
checkConnection();
out.writeByte(m_transact);
- out.writeObject(t);
+ t.writeExternal(out);
waitAnswer();
}
@@ -189,7 +189,6 @@
static final int m_addMessage = 2;
static final int m_browse = 3;
static final int m_checkID = 4;
- static final int m_checkUser = 19;
static final int m_connectionClosing = 5;
static final int m_createQueue = 6;
static final int m_createTopic = 7;
@@ -197,13 +196,14 @@
static final int m_getID = 9;
static final int m_getTemporaryQueue = 10;
static final int m_getTemporaryTopic = 11;
- static final int m_listenerChange = 12;
static final int m_receive = 13;
static final int m_setEnabled = 14;
static final int m_setSpyDistributedConnection = 15;
static final int m_subscribe = 16;
static final int m_transact = 17;
static final int m_unsubscribe = 18;
+ static final int m_destroySubscription = 19;
+ static final int m_checkUser = 20;
protected transient ObjectOutputStream out;
//Remote stuff
1.7 +49 -43 jbossmq/src/main/org/jbossmq/il/oil/OILServerILService.java
Index: OILServerILService.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jbossmq/il/oil/OILServerILService.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- OILServerILService.java 2001/08/01 01:18:32 1.6
+++ OILServerILService.java 2001/08/09 00:27:21 1.7
@@ -16,6 +16,7 @@
import org.jbossmq.il.ServerILJMXService;
import org.jbossmq.il.ServerIL;
import org.jbossmq.server.JMSServer;
+import org.jbossmq.DurableSubcriptionID;
import java.net.Socket;
import javax.jms.JMSException;
@@ -42,14 +43,14 @@
import org.jbossmq.SpyDestination;
/**
- * Implements the ServerILJMXService which is used to
+ * Implements the ServerILJMXService which is used to
* manage the JVM IL.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class OILServerILService extends ServerILJMXService implements
OILServerILServiceMBean, Runnable {
-
+
OILServerIL serverIL;
/**
@@ -77,13 +78,13 @@
serverSocket = new ServerSocket(serverBindPort);
serverSocket.setSoTimeout(SO_TIMEOUT);
-
+
category.info("JBossMQ OIL service available at :
"+InetAddress.getLocalHost().getHostAddress()+ ":"+ serverSocket.getLocalPort());
worker = new Thread(server.threadGroup, this,"OIL Worker");
worker.start();
-
+
serverIL = new OILServerIL(InetAddress.getLocalHost(),
serverSocket.getLocalPort());
-
+
bindJNDIReferences();
}
@@ -115,7 +116,6 @@
static final int m_addMessage = 2;
static final int m_browse = 3;
static final int m_checkID = 4;
- static final int m_checkUser = 19;
static final int m_connectionClosing = 5;
static final int m_createQueue = 6;
static final int m_createTopic = 7;
@@ -123,13 +123,15 @@
static final int m_getID = 9;
static final int m_getTemporaryQueue = 10;
static final int m_getTemporaryTopic = 11;
- static final int m_listenerChange = 12;
static final int m_receive = 13;
static final int m_setEnabled = 14;
static final int m_setSpyDistributedConnection = 15;
static final int m_subscribe = 16;
static final int m_transact = 17;
static final int m_unsubscribe = 18;
+ static final int m_destroySubscription = 19;
+ static final int m_checkUser = 20;
+
boolean running;
//The server implementation
protected static JMSServer server;
@@ -147,7 +149,7 @@
ObjectInputStream in=null;
ConnectionToken connectionToken=null;
boolean closed = false;
-
+
try {
while( running && socket==null ) {
try {
@@ -161,7 +163,7 @@
socket.setSoTimeout(0);
new Thread(this,"OIL Worker").start();
-
+
out = new ObjectOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
out.flush();
in = new ObjectInputStream(new
BufferedInputStream(socket.getInputStream()));
@@ -187,54 +189,53 @@
category.warn("Connection failure (1).", e);
break;
}
-
+
try {
-
- Object result=null;
-
+
+ Object result=null;
+
switch (code)
{
case m_setSpyDistributedConnection:
connectionToken =
(ConnectionToken)in.readObject();
- category.debug("The UILClientIL Connection is
set up");
- break;
+ category.debug("The UILClientIL Connection
is set up");
+ break;
case m_acknowledge:
- server.acknowledge(connectionToken,
(AcknowledgementRequest)in.readObject());
+ AcknowledgementRequest ack = new AcknowledgementRequest();
+ ack.readExternal(in);
+ server.acknowledge(connectionToken,
ack);
break;
- case m_addMessage:
- server.addMessage(connectionToken,
(SpyMessage)in.readObject());
+ case m_addMessage:
+ server.addMessage(connectionToken,
SpyMessage.readMessage(in));
break;
- case m_browse:
+ case m_browse:
result=server.browse(connectionToken,
(Destination)in.readObject(), (String)in.readObject());
break;
- case m_checkID:
+ case m_checkID:
server.checkID((String)in.readObject());
break;
- case m_connectionClosing:
+ case m_connectionClosing:
server.connectionClosing(connectionToken);
closed = true;
break;
- case m_createQueue:
+ case m_createQueue:
result=(Queue)server.createQueue(connectionToken, (String)in.readObject());
break;
- case m_createTopic:
+ case m_createTopic:
result=(Topic)server.createTopic(connectionToken, (String)in.readObject());
break;
- case m_deleteTemporaryDestination:
+ case m_deleteTemporaryDestination:
server.deleteTemporaryDestination(connectionToken, (SpyDestination)in.readObject());
break;
- case m_getID:
+ case m_getID:
result=server.getID();
break;
- case m_getTemporaryQueue:
+ case m_getTemporaryQueue:
result=(TemporaryQueue)server.getTemporaryQueue(connectionToken);
break;
case m_getTemporaryTopic:
result=(TemporaryTopic)server.getTemporaryTopic(connectionToken);
break;
- case m_listenerChange:
-
server.listenerChange(connectionToken,in.readInt(),in.readBoolean());
- break;
case m_receive:
result=server.receive(connectionToken,in.readInt(), in.readLong());
break;
@@ -245,22 +246,27 @@
server.subscribe(connectionToken,
(Subscription)in.readObject());
break;
case m_transact:
- server.transact(connectionToken,
(TransactionRequest)in.readObject());
+ TransactionRequest trans = new TransactionRequest();
+ trans.readExternal(in);
+ server.transact(connectionToken,
trans);
break;
- case m_unsubscribe:
+ case m_unsubscribe:
server.unsubscribe(connectionToken,in.readInt());
+ break;
+ case m_destroySubscription:
+
server.destroySubscription((DurableSubcriptionID)in.readObject());
break;
- case m_checkUser:
+ case m_checkUser:
result =
server.checkUser((String)in.readObject(), (String)in.readObject());
break;
default:
throw new RemoteException("Bad method
code !");
}
-
+
//Everthing was OK
-
+
try {
- if (result==null)
+ if (result==null)
out.writeByte(0);
else {
out.writeByte(1);
@@ -271,15 +277,15 @@
} catch (IOException e) {
if( closed )
break;
-
+
category.warn("Connection failure (2).", e);
- break;
+ break;
}
-
+
} catch (Exception e) {
if( closed )
break;
-
+
category.info("Client request resulted in a server
exception: ", e);
try {
@@ -290,10 +296,10 @@
} catch (IOException e2) {
if( closed )
break;
-
+
category.warn("Connection failure (3).", e);
- break;
- }
+ break;
+ }
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development