User: hiram
Date: 01/02/14 19:33:06
Added: src/main/org/jbossmq/cluster Cluster.java
ClusterTesterReceiver.java ClusterTesterSender.java
InvalidConfigurationException.java
InvalidStateException.java SerializerUtil.java
TopicListener.java Transport.java
TransportListener.java
Log:
Implemented a Multicast based pub/sub subsystem targeted for building clustered
applications.
Revision Changes Path
1.1 jbossmq/src/main/org/jbossmq/cluster/Cluster.java
Index: Cluster.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.Iterator;
import java.io.ObjectOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Externalizable;
import java.io.ObjectInputStream;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
/**
* This class provides the interface to send messages to a cluster.
* The goal was to implement a highly efficient messaging backbone for
* inter node communications of a cluster of JVMs.
*
* The messages can be either Serilizable objects or CustomSerializer
* Objects. The CustomSerializer objects can be used if a highly efficient
* message format is required.
*
* This class behaves similary to a TopicSession in JMS. It will broadcast
* messages to all listeners of a topic in the cluster.
* To make it efficient, topics are identified by a short!
*
* TopicListeners should not process the inbound messages in the onMessage(...)
* method. They should queue the message so that it is processed async.
* This will allow the cluster to dispatch messages to all the listners quicker.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class Cluster {
// This is maps topics -> LinkedList of listeners
private HashMap topicListeners = new HashMap();
// This is maps topics -> Class used for the CustomSerializers
private HashMap customExternalizers = new HashMap();
// The transport layer that the cluster will be using
private Transport transport;
// The node id of the local process. All process in a cluster MUST
// have unique nodeIds
public short nodeId;
// The managment topic is reserved for the transport layer's use.
public static final short MANAGMENT_TOPIC = 0;
// This is the TransportListener the cluster will used.
// Implemented as an inner class to keep the clusters public interface
// easy to use.
private MyTransportListener transportListener = new MyTransportListener();
private class MyTransportListener implements TransportListener {
public void messageArrivedEvent(short topic, short senderId, byte
message[])
throws InterruptedException {
Short id = new Short(topic);
LinkedList ll = (LinkedList) topicListeners.get(id);
if (ll == null) {
// No listeners...
return;
}
try {
Object o;
ByteArrayInputStream bais = new
ByteArrayInputStream(message);
ObjectInputStream is = new ObjectInputStream(bais);
Class serializerClassName = (Class)
customExternalizers.get(id);
if (serializerClassName != null) {
Externalizable s = (Externalizable)
serializerClassName.newInstance();
s.readExternal(is);
o = s;
} else {
o = is.readObject();
}
Iterator i = ll.iterator();
while (i.hasNext()) {
TopicListener c = (TopicListener) i.next();
c.onMessage(topic, senderId, o);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* Dumb constructor.
* You must initialize the cluster before using it.
* (Sending messages, starting the transport layer)
*/
public Cluster() {
super();
}
/**
* Allows us to change the transport protocol used for the cluster.
*/
public void setTransport(Transport newTransport) {
transport = newTransport;
transport.setTransportListener(transportListener);
transport.setCluster(this);
}
/**
* Every node in the cluster has to have a unique id.
*/
public void setNodeId(short newNodeId) {
nodeId = newNodeId;
}
/**
* The id of this node of the cluster.
*/
public short getNodeId() {
return nodeId;
}
/**
* The transport being used by the cluster.
* @return Transport
*/
public Transport getTransport() {
return transport;
}
/**
* Send a message to the cluster on the given channel.
* The message will also be sent to the local node.
* The message will be marked as not droppable.
*/
public void send(short topic, Object message)
throws IOException, InterruptedException {
// Send w/ local broacast && non droppable
send(topic, message, true, false, true);
}
/**
* Registers a TopicListener to recive messages on a topic
*/
public void addTopicListener(short topic, TopicListener c) {
Short id = new Short(topic);
// Write Lock
synchronized (topicListeners) {
// We do not modify the current topicListeners list
// so that iterators on the topicListeners do not fail.
LinkedList ll = (LinkedList) topicListeners.get(id);
if (ll == null) {
ll = new LinkedList();
} else {
ll = (LinkedList) ll.clone();
}
ll.add(c);
HashMap t = (HashMap) topicListeners.clone();
t.put(id, ll);
topicListeners = t;
}
}
/**
* UnRegisters a TopicListener from a topic.
*/
public void removeTopicListener(short topic, TopicListener c) {
Short id = new Short(topic);
// Write Lock
synchronized (topicListeners) {
// We do not modify the current topicListeners list
// so that iterators on the topicListeners do not fail.
LinkedList ll = (LinkedList) topicListeners.get(id);
if (ll == null) {
// Listener had not been added
return;
} else {
ll = (LinkedList) ll.clone();
}
boolean ok = ll.remove(c);
if (!ok) {
// Listener had not been added
return;
}
HashMap t = (HashMap) topicListeners.clone();
if (ll.size() > 0)
t.put(id, ll);
else
t.remove(id);
topicListeners = t;
}
}
/**
* Are there any local listners on the given topic
*/
public boolean isListeningOn(short topic) {
return topicListeners.get(new Short(topic)) != null;
}
/**
* Adds a custom externalizer for a channel.
*
* Using a custom externalizer for a channel could give BIG performance
* gains since the type of object that will be sent over the channel will be
* FIXED.
*/
public void addCustomExternalizer(short topic, Class c) {
Short id = new Short(topic);
// Write Lock
synchronized (customExternalizers) {
HashMap t = (HashMap) topicListeners.clone();
t.put(id, c);
customExternalizers = t;
}
}
/**
* Removes the custom externalized for the channel
*/
public void removeCustomExternalizer(short topic, Class c) {
Short id = new Short(topic);
// Write Lock
synchronized (customExternalizers) {
HashMap t = (HashMap) topicListeners.clone();
t.remove(id);
customExternalizers = t;
}
}
/**
* Used internally to send a message to all the local listeners.
*/
private void messageArrivedEvent(short topic, short sender, Object message)
throws InterruptedException {
LinkedList ll = (LinkedList) topicListeners.get(new Short(topic));
if (ll == null) {
// No listeners...
return;
}
Iterator i = ll.iterator();
while (i.hasNext()) {
TopicListener c = (TopicListener) i.next();
c.onMessage(topic, sender, message);
}
}
/**
* Send a message to the cluster on the given channel.
* The local node will also get the message if localBroadcast is true.
* The droppable flag is hint to the transport layer.
*/
public void send(
short topic,
Object message,
boolean localBroadcast,
boolean droppable,
boolean keepOrder)
throws IOException, InterruptedException {
if (localBroadcast)
messageArrivedEvent(topic, getNodeId(), message);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream os = new ObjectOutputStream(bos);
if (message instanceof Externalizable) {
((Externalizable) message).writeExternal(os);
} else {
os.writeObject(message);
}
os.close();
byte data[] = bos.toByteArray();
transport.send(topic, data, droppable, keepOrder);
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/ClusterTesterReceiver.java
Index: ClusterTesterReceiver.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import EDU.oswego.cs.dl.util.concurrent.*;
import org.jbossmq.cluster.udp.UDPTransport;
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClusterTesterReceiver {
public static void main(java.lang.String[] args) throws Exception {
Cluster cluster = new Cluster();
UDPTransport transport = new UDPTransport();
java.util.Properties p = new java.util.Properties();
java.io.InputStream is =
transport.getClass().getClassLoader().getResourceAsStream("receiver.txt");
p.load(is);
is.close();
transport.setProperties(p);
cluster.setNodeId(Short.parseShort(p.getProperty("NodeId")));
cluster.setTransport(transport);
short echoChannelId = 13;
final Channel echoChannel = new BoundedLinkedQueue(10);
TopicListener myListener = new TopicListener() {
public void onMessage(short topic, short sender, Object m) {
try {
System.out.println("Got :" + m);
//echoChannel.put( m );
} catch (Exception e) {
}
}
};
cluster.addTopicListener(echoChannelId, myListener);
transport.start();
System.out.println("Waiting for messages");
while (true) {
String t = (String) echoChannel.take();
System.out.println("Got :" + t);
}
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/ClusterTesterSender.java
Index: ClusterTesterSender.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import EDU.oswego.cs.dl.util.concurrent.*;
import org.jbossmq.cluster.udp.UDPTransport;
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ClusterTesterSender {
/**
* Starts the application.
* @param args an array of command-line arguments
*/
public static void main(java.lang.String[] args) throws Exception {
Cluster cluster = new Cluster();
UDPTransport transport = new UDPTransport();
java.util.Properties p = new java.util.Properties();
java.io.InputStream is =
transport.getClass().getClassLoader().getResourceAsStream("sender.txt");
p.load(is);
is.close();
transport.setProperties(p);
cluster.setNodeId(Short.parseShort(p.getProperty("NodeId")));
cluster.setTransport(transport);
transport.start();
short echoChannelId = 13;
String t = "Hello World";
for (int i = 0; i < 500; i++) {
cluster.send(echoChannelId, t + " #" + i);
}
System.out.println("Message, sent");
synchronized (cluster) {
cluster.wait();
}
}
}
1.1
jbossmq/src/main/org/jbossmq/cluster/InvalidConfigurationException.java
Index: InvalidConfigurationException.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import java.lang.Exception;
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class InvalidConfigurationException extends Exception {
/**
* InvalidConfigurationException constructor comment.
*/
public InvalidConfigurationException() {
super();
}
/**
* InvalidConfigurationException constructor comment.
* @param s java.lang.String
*/
public InvalidConfigurationException(String s) {
super(s);
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/InvalidStateException.java
Index: InvalidStateException.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import java.lang.Exception;
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class InvalidStateException extends Exception {
/**
* InvalidConfigurationException constructor comment.
*/
public InvalidStateException() {
super();
}
/**
* InvalidConfigurationException constructor comment.
* @param s java.lang.String
*/
public InvalidStateException(String s) {
super(s);
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/SerializerUtil.java
Index: SerializerUtil.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import java.lang.Object;
/**
* This is a utility class which hold functions useful for converting from
* Java primitive types to byte[]s
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SerializerUtil {
/**
* This is used for unit testing the class
* @param args java.lang.String[]
*/
public static void main(String[] args) {
byte data[] = new byte[8];
for (int i = -10; i < 150; i++) {
writeIntTo(i, data, 0);
int r = readIntFrom(data, 0);
System.out.println("Int :" + i + "==" + r);
}
for (long i = -10; i < 150; i++) {
writeLongTo(i, data, 0);
long r = readLongFrom(data, 0);
System.out.println("Long :" + i + "==" + r);
}
}
public static int readIntFrom(byte[] data, int offset) {
return (
((data[offset] & 0xFF) << 24)
| ((data[offset + 1] & 0xFF) << 16)
| ((data[offset + 2] & 0xFF) << 8)
| ((data[offset + 3] & 0xFF) << 0));
}
public static long readLongFrom(byte[] data, int offset) {
return (
((long) (data[offset] & 0xFF) << 56)
| ((long) (data[offset + 1] & 0xFF) << 48)
| ((long) (data[offset + 2] & 0xFF) << 40)
| ((long) (data[offset + 3] & 0xFF) << 32)
| ((long) (data[offset + 4] & 0xFF) << 24)
| ((long) (data[offset + 5] & 0xFF) << 16)
| ((long) (data[offset + 6] & 0xFF) << 8)
| ((long) (data[offset + 7] & 0xFF) << 0));
}
public static short readShortFrom(byte[] data, int offset) {
return (short)
(((data[offset] << 8) & 0xFF) | ((data[offset + 1] << 0) &
0xFF));
}
static public void writeIntTo(int v, byte[] data, int pos) {
data[pos] = (byte) ((v >>> 24) & 0xFF);
data[pos + 1] = (byte) ((v >>> 16) & 0xFF);
data[pos + 2] = (byte) ((v >>> 8) & 0xFF);
data[pos + 3] = (byte) ((v >>> 0) & 0xFF);
}
static public void writeLongTo(long v, byte data[], int pos) {
data[pos] = (byte) ((v >>> 56) & 0xFF);
data[pos + 1] = (byte) ((v >>> 48) & 0xFF);
data[pos + 2] = (byte) ((v >>> 40) & 0xFF);
data[pos + 3] = (byte) ((v >>> 32) & 0xFF);
data[pos + 4] = (byte) ((v >>> 24) & 0xFF);
data[pos + 5] = (byte) ((v >>> 16) & 0xFF);
data[pos + 6] = (byte) ((v >>> 8) & 0xFF);
data[pos + 7] = (byte) ((v >>> 0) & 0xFF);
}
public static void writeShortTo(short v, byte[] data, int pos) {
data[pos] = (byte) ((v >>> 8) & 0xFF);
data[pos + 1] = (byte) ((v >>> 0) & 0xFF);
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/TopicListener.java
Index: TopicListener.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import java.lang.Object;
/**
* This is the interface clients must implement to receive messages
* from the cluster.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*
*/
public interface TopicListener {
public void onMessage( short topic, short sender, Object message );
}
1.1 jbossmq/src/main/org/jbossmq/cluster/Transport.java
Index: Transport.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import java.io.IOException;
/**
* This is the interface any Transport class must implement.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*
*/
public interface Transport {
public void setCluster(Cluster c);
public void setTransportListener(TransportListener c);
public void send(short channelId, byte data[], boolean droppable, boolean
keepOrder) throws IOException, InterruptedException;
}
1.1 jbossmq/src/main/org/jbossmq/cluster/TransportListener.java
Index: TransportListener.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import java.lang.*;
import java.lang.Object;
/**
* This class is used by the Transport interface to signal the
* the cluster that a message has arrived from the network.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public interface TransportListener {
public void messageArrivedEvent( short channelId, short senderId, byte
message[] ) throws InterruptedException;
}