User: norbert
Date: 00/06/14 21:10:01
Added: src/java/org/spydermq/distributed/server
ConnectionReceiverOIL.java
ConnectionReceiverOILClient.java
Log:
no message
Revision Changes Path
1.1
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
Index: ConnectionReceiverOIL.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.distributed.server;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.spydermq.SpyConnection;
import org.spydermq.ConnectionQueue;
import org.spydermq.SpyMessage;
import org.spydermq.SpySession;
import org.spydermq.SessionQueue;
import org.spydermq.SpyDestination;
import org.spydermq.SpyTopicConnection;
import org.spydermq.SpyQueueSession;
import org.spydermq.Log;
import org.spydermq.NoReceiverException;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import java.util.Hashtable;
import java.util.HashSet;
import java.util.Iterator;
import java.rmi.RemoteException;
import java.net.ServerSocket;
import java.net.Socket;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;
/**
* The OIL implementation of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ConnectionReceiverOIL implements ConnectionReceiver, Runnable
{
// Attributes ----------------------------------------------------
//A link on my connection
private SpyConnection connection;
//Is my connection closed ?
private boolean closed;
// Constructor ---------------------------------------------------
public ConnectionReceiverOIL()
{
closed=false;
exportObject();
}
// Internals -----------------------------------------------------
// Should be hold in an "extention" and the ConnectionReceiver logic should be
in a separated object
// Why not generate a dynamic proxy for that :)
static final int RECEIVE = 1;
static final int RECEIVE_MULTIPLE = 2;
static final int DELETE_TEMPORARY_DESTINATION = 3;
static final int CLOSE = 4;
ServerSocket serverSocket;
public void exportObject()
{
try {
//Should be dynamic...
serverSocket = new ServerSocket(12345);
new Thread(this).start();
} catch (IOException e) {
Log.error("Bug");
}
}
public void run()
{
Socket socket = null;
int code = 0;
InputStream is=null;
OutputStream os=null;
ObjectOutputStream out=null;
ObjectInputStream in=null;
try {
socket = serverSocket.accept();
new Thread(this).start();
is=socket.getInputStream();
os=socket.getOutputStream();
out = new ObjectOutputStream(os);
in = new ObjectInputStream(is);
} catch (IOException e) {
Log.error("Bug");
}
while (true) {
Log.log("Wait for command");
try {
code=in.read();
} catch (IOException e) {
Log.error("Bug");
}
try {
Log.log("code = "+code);
switch (code)
{
case RECEIVE:
receive((SpyDestination)in.readObject(),(SpyMessage)in.readObject());
break;
case RECEIVE_MULTIPLE:
receiveMultiple((SpyDestination)in.readObject(),(SpyMessage[])in.readObject());
break;
case DELETE_TEMPORARY_DESTINATION:
deleteTemporaryDestination((SpyDestination)in.readObject());
break;
case CLOSE:
close();
break;
default:
throw new RemoteException("Bad method
code !");
}
//Everthing was OK
Log.log("OK !");
try {
os.write(0);
} catch (IOException e) {
Log.error("Bug");
}
} catch (Exception e) {
Log.log("Throw exception");
Log.error(e);
try {
os.write(1);
} catch (IOException e2) {
Log.error("Bug");
}
}
}
}
// Public --------------------------------------------------------
public void setConnection(SpyConnection connection)
{
this.connection=connection;
}
//A message has arrived for this Connection, We have to dispatch it to the
sessions
public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
Log.log("ConnectionReceiver:
Receive(Destination="+dest.toString()+",Mes="+mes.toString()+")");
if (connection instanceof SpyTopicConnection) {
//Get the set of subscribers for this Topic
ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
if (connectionQueue==null) return;
Iterator i=connectionQueue.subscribers.iterator();
while (i.hasNext()) {
SpySession session=(SpySession)i.next();
//add the new message to the session's queue
session.dispatchMessage(dest,mes);
//There is work to do...
session.mutex.notifyLock();
}
} else {
while (true) {
SessionQueue sq=null;
try {
//Find one session waiting for this Queue
if (connection.modeStop) throw new
Exception("This connection is stopped !");
ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
if (connectionQueue==null) throw new
Exception("There is no connectionQueue for this destination !");
synchronized (connectionQueue) {
//Find a SessionQueue
if
(connectionQueue.NumListeningSessions==0) throw new Exception("There are no listening
sessions for this destination !");
Iterator
i=connectionQueue.subscribers.iterator();
while (i.hasNext()) {
SpySession
session=(SpySession)i.next();
sq=(SessionQueue)session.destinations.get(dest);
if
(sq.NumListeningSubscribers!=0) break;
}
if
(sq==null||sq.NumListeningSubscribers==0) {
Log.error("FIXME: The
listeners count was invalid !");
throw new Exception("There are
no listening sessions for this destination !");
}
//Try with this sessionQueue
sq.dispatchMessage(dest,mes);
//Our work is done here
break;
}
} catch (NoReceiverException e) {
//This SessionQueue should not have been
registered !
continue;
} catch (Exception e) {
//This error is non-recoverable : we must
unregister from this queue
//Let the JMSServerQueue do its work
Log.log(e);
throw new NoReceiverException("There are no
listening sessions in this connection");
}
}
}
}
public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
{
for(int i=0;i<mes.length;i++) {
receive(dest,mes[i]);
}
}
public void close() throws Exception
{
closed=true;
}
//One TemporaryDestination has been deleted
public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
{
if (closed) throw new IllegalStateException("The connection is
closed");
connection.deleteTemporaryDestination(dest);
}
}
1.1
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
Index: ConnectionReceiverOILClient.java
===================================================================
package org.spydermq.distributed.server;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.SpyDestination;
import org.spydermq.SpyMessage;
import org.spydermq.Log;
import org.spydermq.SpyConnection;
import java.rmi.RemoteException;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.net.Socket;
public class ConnectionReceiverOILClient
implements ConnectionReceiver
{
static final int RECEIVE = 1;
static final int RECEIVE_MULTIPLE = 2;
static final int DELETE_TEMPORARY_DESTINATION = 3;
static final int CLOSE = 4;
Socket socket;
InputStream is;
OutputStream os;
ObjectOutputStream out;
ObjectInputStream in;
ConnectionReceiverOILClient() throws RemoteException
{
try {
Socket socket=new Socket("localhost",12345);
is=socket.getInputStream();
os=socket.getOutputStream();
in=new ObjectInputStream(is);
out=new ObjectOutputStream(os);
} catch (Exception e) {
Log.error(e);
}
}
public void waitAnswer() throws Exception
{
try {
int val=is.read();
if (val==0) Log.log("Return : OK");
else {
Log.log("Return : Exception");
throw new Exception("Remote Exception");
}
} catch (IOException e) {
Log.error("IOException while reading the answer");
Log.error(e);
throw new RemoteException("Cannot contact the Distant object");
}
}
public void receive(SpyDestination dest,SpyMessage mes) throws Exception
{
os.write(RECEIVE);
out.writeObject(dest);
out.writeObject(mes);
waitAnswer();
}
public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
Exception
{
os.write(RECEIVE_MULTIPLE);
out.writeObject(dest);
out.writeObject(mes);
waitAnswer();
}
public void deleteTemporaryDestination(SpyDestination dest) throws Exception
{
os.write(DELETE_TEMPORARY_DESTINATION);
out.writeObject(dest);
waitAnswer();
}
public void close() throws Exception
{
os.write(CLOSE);
waitAnswer();
}
public void setConnection(SpyConnection connection) throws Exception
{
//SHOULD NOT be there !!!
}
}