User: hiram
Date: 01/02/27 20:35:00
Modified: src/main/org/jbossmq/cluster Cluster.java
InvalidConfigurationException.java
InvalidStateException.java NodeId.java
SerializerUtil.java TopicListener.java
Transport.java TransportListener.java
Added: src/main/org/jbossmq/cluster TestReceiver.java
TestSender.java
Removed: src/main/org/jbossmq/cluster ClusterTesterReceiver.java
ClusterTesterSender.java
Log:
We can now send point 2 point or broadcast messages in the cluster. Refactored alot
of the udp implementation.
Revision Changes Path
1.3 +42 -0 jbossmq/src/main/org/jbossmq/cluster/Cluster.java
Index: Cluster.java
===================================================================
RCS file: /products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/Cluster.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- Cluster.java 2001/02/22 06:12:26 1.2
+++ Cluster.java 2001/02/28 04:34:56 1.3
@@ -36,7 +36,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class Cluster {
@@ -286,5 +286,47 @@
TopicListener c = (TopicListener) i.next();
c.onMessage(topic, sender, message);
}
+ }
+
+ /**
+ * Send a message to the cluster on the given channel.
+ * The message will also be sent to the local node.
+ * The message will be marked as not droppable.
+ */
+ public void send(NodeId dest, short topic, Object message)
+ throws IOException, InterruptedException {
+ // Send w/ local broacast && non droppable
+ send(dest, topic, message, true, false, true);
+ }
+
+ /**
+ * Send a message to the node on the given channel.
+ * The local node will also get the message if localBroadcast is true.
+ * The droppable flag is hint to the transport layer.
+ */
+ public void send(NodeId dest,
+ short topic,
+ Object message,
+ boolean localBroadcast,
+ boolean droppable,
+ boolean keepOrder)
+ throws IOException, InterruptedException {
+
+ if (localBroadcast)
+ messageArrivedEvent(topic, null, message);
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream os = new ObjectOutputStream(bos);
+
+ if (message instanceof Externalizable) {
+ ((Externalizable) message).writeExternal(os);
+ } else {
+ os.writeObject(message);
+ }
+ os.close();
+
+ byte data[] = bos.toByteArray();
+ transport.send(dest, topic, data, droppable, keepOrder);
+
}
}
1.3 +0 -0
jbossmq/src/main/org/jbossmq/cluster/InvalidConfigurationException.java
Index: InvalidConfigurationException.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/InvalidConfigurationException.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- InvalidConfigurationException.java 2001/02/22 06:12:26 1.2
+++ InvalidConfigurationException.java 2001/02/28 04:34:57 1.3
@@ -11,7 +11,7 @@
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class InvalidConfigurationException extends Exception {
1.3 +0 -0 jbossmq/src/main/org/jbossmq/cluster/InvalidStateException.java
Index: InvalidStateException.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/InvalidStateException.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- InvalidStateException.java 2001/02/22 06:12:26 1.2
+++ InvalidStateException.java 2001/02/28 04:34:57 1.3
@@ -11,7 +11,7 @@
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class InvalidStateException extends Exception {
1.2 +0 -0 jbossmq/src/main/org/jbossmq/cluster/NodeId.java
Index: NodeId.java
===================================================================
RCS file: /products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/NodeId.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- NodeId.java 2001/02/22 06:12:26 1.1
+++ NodeId.java 2001/02/28 04:34:58 1.2
@@ -6,7 +6,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface NodeId {
}
1.3 +0 -0 jbossmq/src/main/org/jbossmq/cluster/SerializerUtil.java
Index: SerializerUtil.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/SerializerUtil.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SerializerUtil.java 2001/02/22 06:12:26 1.2
+++ SerializerUtil.java 2001/02/28 04:34:58 1.3
@@ -14,7 +14,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SerializerUtil {
1.3 +0 -0 jbossmq/src/main/org/jbossmq/cluster/TopicListener.java
Index: TopicListener.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/TopicListener.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- TopicListener.java 2001/02/22 06:12:27 1.2
+++ TopicListener.java 2001/02/28 04:34:58 1.3
@@ -14,7 +14,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*
*/
public interface TopicListener {
1.3 +1 -0 jbossmq/src/main/org/jbossmq/cluster/Transport.java
Index: Transport.java
===================================================================
RCS file: /products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/Transport.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- Transport.java 2001/02/22 06:12:27 1.2
+++ Transport.java 2001/02/28 04:34:58 1.3
@@ -13,7 +13,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*
*/
public interface Transport {
@@ -22,4 +22,5 @@
public void setTransportListener(TransportListener c);
public void send(short channelId, byte data[], boolean droppable, boolean
keepOrder) throws IOException, InterruptedException;
+ public void send(NodeId dest, short channelId, byte data[], boolean droppable,
boolean keepOrder) throws IOException, InterruptedException;
}
1.3 +0 -0 jbossmq/src/main/org/jbossmq/cluster/TransportListener.java
Index: TransportListener.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/TransportListener.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- TransportListener.java 2001/02/22 06:12:27 1.2
+++ TransportListener.java 2001/02/28 04:34:58 1.3
@@ -16,7 +16,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public interface TransportListener {
1.1 jbossmq/src/main/org/jbossmq/cluster/TestReceiver.java
Index: TestReceiver.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import EDU.oswego.cs.dl.util.concurrent.*;
import org.jbossmq.cluster.udp.UDPTransport;
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class TestReceiver {
public static void main(java.lang.String[] args) throws Exception {
Cluster cluster = new Cluster();
UDPTransport transport = new UDPTransport();
java.util.Properties p = new java.util.Properties();
p.setProperty("TransportMode","Multicast");
p.setProperty("Group","225.1.1.1");
p.setProperty("Port", "1000");
transport.setProperties(p);
cluster.setTransport(transport);
short echoChannelId = 13;
final Channel echoChannel = new BoundedLinkedQueue(10);
TopicListener myListener = new TopicListener() {
public void onMessage(short topic, NodeId sender, Object m) {
try {
System.out.println("Got :" + m);
//echoChannel.put( m );
} catch (Exception e) {
}
}
};
cluster.addTopicListener(echoChannelId, myListener);
transport.start();
System.out.println("Waiting for messages");
while (true) {
String t = (String) echoChannel.take();
System.out.println("Got :" + t);
}
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/TestSender.java
Index: TestSender.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster;
import EDU.oswego.cs.dl.util.concurrent.*;
import org.jbossmq.cluster.udp.UDPTransport;
/**
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class TestSender {
/**
* Starts the application.
* @param args an array of command-line arguments
*/
public static void main(java.lang.String[] args) throws Exception {
Cluster cluster = new Cluster();
UDPTransport transport = new UDPTransport();
java.util.Properties p = new java.util.Properties();
p.setProperty("TransportMode","Multicast");
p.setProperty("Group","225.1.1.1");
p.setProperty("Port", "1000");
transport.setProperties(p);
cluster.setTransport(transport);
transport.start();
short echoChannelId = 13;
String t = "Hello World";
for (int i = 0; i < 1000; i++) {
cluster.send(echoChannelId, t + " #" + i);
}
System.out.println("Message, sent");
synchronized (cluster) {
cluster.wait();
}
}
}