User: hiram
Date: 00/12/31 15:45:43
Added: examples MyMessageTopicExample.java MyMessage.java
Log:
Implemented Non-optimized message delivery.
Revision Changes Path
1.1 spyderMQ/examples/MyMessageTopicExample.java
Index: MyMessageTopicExample.java
===================================================================
/*
* @(#)SynchTopicExample.java 1.7 00/08/18
*
* Copyright (c) 2000 Sun Microsystems, Inc. All Rights Reserved.
*
* Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,
* modify and redistribute this software in source and binary code form,
* provided that i) this copyright notice and license appear on all copies of
* the software; and ii) Licensee does not utilize the software in a manner
* which is disparaging to Sun.
*
* This software is provided "AS IS," without a warranty of any kind. ALL
* EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY
* IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR
* NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE
* LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS
* LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,
* INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER
* CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF
* OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGES.
*
* This software is not designed or intended for use in on-line control of
* aircraft, air traffic, aircraft navigation or aircraft communications; or in
* the design, construction, operation or maintenance of any nuclear
* facility. Licensee represents and warrants that it will not use or
* redistribute the Software for such purposes.
*/
import javax.jms.*;
/**
* The SynchTopicExample class demonstrates the simplest form of the
* publish/subscribe model: the publisher publishes a custom message (MyMessage),
and the
* subscriber reads it using a synchronous receive.
* <p>
* The program contains a SimplePublisher class, a SynchSubscriber class, a
* main method, and a method that runs the subscriber and publisher
* threads.
* <p>
* Specify a topic name on the command line when you run the program.
* <p>
* The program calls methods in the SampleUtilities class.
*
* @author Kim Haase
* @version 1.7, 08/18/00
*/
public class MyMessageTopicExample {
String topicName = null;
int exitResult = 0;
/**
* The SynchSubscriber class fetches a single message from a topic using
* synchronous message delivery.
*
* @author Kim Haase
* @version 1.7, 08/18/00
*/
public class SynchSubscriber extends Thread {
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
final boolean NOLOCAL = true;
MyMessage inMessage = null;
MyMessage outMessage = null;
TopicPublisher topicPublisher = null;
/*
* Obtain connection factory.
* Create connection.
* Create session from connection; false means session is not
* transacted.
* Obtain topic name.
*/
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession =
topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName,
topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " +
e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create subscriber, then start message delivery. Subscriber
is
* non-local so that it won't receive the message we publish.
* Wait for text message to arrive, then display its contents.
* Close connection and exit.
*/
try {
topicSubscriber =
topicSession.createSubscriber(topic, null,
NOLOCAL);
topicConnection.start();
inMessage = (MyMessage)topicSubscriber.receive();
System.out.println("SUBSCRIBER THREAD: Reading
message: "
+
inMessage.getText());
/*
* Notify publisher that we received a message and it
* can stop broadcasting.
*/
topicPublisher = topicSession.createPublisher(topic);
outMessage = new MyMessage();
outMessage.setText("Done");
topicPublisher.publish(outMessage);
} catch (JMSException e) {
System.out.println("Exception occurred: " +
e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* The SimplePublisher class publishes a single message to a topic.
*
* @author Kim Haase
* @version 1.7, 08/18/00
*/
public class SimplePublisher extends Thread {
/**
* Runs the thread.
*/
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber publisherControlSubscriber = null;
final boolean NOLOCAL = true;
TopicPublisher topicPublisher = null;
MyMessage sentMessage = null;
final String MSG_TEXT = new String("Here is a
message ");
Message receivedMessage = null;
/*
* Obtain connection factory.
* Create connection.
* Create session from connection; false means session is not
* transacted.
* Obtain topic name.
*/
try {
topicConnectionFactory =
SampleUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession =
topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = SampleUtilities.getTopic(topicName,
topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " +
e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
System.exit(1);
}
/*
* Create non-local subscriber to receive "Done" message from
* another connection; start delivery.
* Create publisher and text message.
* Set message text, display it, and publish message.
* Close connection and exit.
*/
try {
publisherControlSubscriber =
topicSession.createSubscriber(topic, null,
NOLOCAL);
topicConnection.start();
/*
* Publish a message once per second until subscriber
* reports that it has finished receiving messages.
*/
topicPublisher = topicSession.createPublisher(topic);
sentMessage = new MyMessage();
for (int i = 1; receivedMessage == null; i++) {
sentMessage.setText(MSG_TEXT + i);
System.out.println("PUBLISHER THREAD:
Publishing message: "
+
sentMessage.getText());
topicPublisher.publish(sentMessage);
try { Thread.sleep(1000); } catch
(InterruptedException ie){}
receivedMessage =
publisherControlSubscriber.receiveNoWait();
}
} catch (JMSException e) {
System.out.println("Exception occurred: " +
e.toString());
exitResult = 1;
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
}
}
/**
* Instantiates the subscriber and publisher classes and starts their
* threads.
* Calls the join method to wait for the threads to die.
* <p>
* It is essential to start the subscriber before starting the publisher.
* In the publish/subscribe model, a subscriber can ordinarily receive only
* messages published while it is active.
*/
public void run_threads() {
SynchSubscriber synchSubscriber = new SynchSubscriber();
SimplePublisher simplePublisher = new SimplePublisher();
synchSubscriber.start();
simplePublisher.start();
try {
synchSubscriber.join();
simplePublisher.join();
} catch (InterruptedException e) {}
}
/**
* Reads the topic name from the command line and displays it. The
* topic must have been created by the jmsadmin tool.
* Calls the run_threads method to execute the program threads.
* Exits program.
*
* @param args the topic used by the example
*/
public static void main(String[] args) {
SynchTopicExample ste = new SynchTopicExample();
if (args.length != 1) {
System.out.println("Usage: java SynchTopicExample <topic_name>");
System.exit(1);
}
ste.topicName = new String(args[0]);
System.out.println("Topic name is " + ste.topicName);
ste.run_threads();
SampleUtilities.exit(ste.exitResult);
}
}
1.1 spyderMQ/examples/MyMessage.java
Index: MyMessage.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
import javax.jms.Message;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
import javax.jms.Destination;
import java.util.Enumeration;
import java.util.Hashtable;
import java.io.Serializable;
import java.lang.Comparable;
/**
* This class implements javax.jms.Message
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class MyMessage implements Serializable, Message {
// Constants -----------------------------------------------------
static final int DEFAULT_DELIVERY_MODE = -1;
static final int DEFAULT_PRIORITY = -1;
static final int DEFAULT_TIME_TO_LIVE = -1;
//Those attributes are not transient ---------------
//Header fields
//Set by send() method
private Destination jmsDestination = null;
private int jmsDeliveryMode = -1;
private long jmsExpiration = 0;
private int jmsPriority = -1;
private String jmsMessageID = null;
private long jmsTimeStamp = 0;
//Set by the client
private boolean jmsCorrelationID = true;
private String jmsCorrelationIDString = null;
private byte[] jmsCorrelationIDbyte = null;
private Destination jmsReplyTo = null;
private String jmsType = null;
//Set by the provider
private boolean jmsRedelivered = false;
//Properties
private Hashtable prop;
public boolean propReadWrite;
//Message body
public boolean msgReadOnly = false;
// Constructor ---------------------------------------------------
MyMessage() {
prop = new Hashtable();
propReadWrite = true;
}
// Public --------------------------------------------------------
public String getJMSMessageID() throws JMSException {
return jmsMessageID;
}
public void setJMSMessageID(String id) throws JMSException {
jmsMessageID = id;
}
public long getJMSTimestamp() throws JMSException {
return jmsTimeStamp;
}
public void setJMSTimestamp(long timestamp) throws JMSException {
jmsTimeStamp = timestamp;
}
public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
if (jmsCorrelationID)
throw new JMSException("JMSCorrelationID is a string");
return jmsCorrelationIDbyte;
}
public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws
JMSException {
jmsCorrelationID = false;
jmsCorrelationIDbyte = (byte[]) correlationID.clone();
jmsCorrelationIDString = null;
}
public void setJMSCorrelationID(String correlationID) throws JMSException {
jmsCorrelationID = true;
jmsCorrelationIDString = correlationID;
jmsCorrelationIDbyte = null;
}
public String getJMSCorrelationID() throws JMSException {
if (!jmsCorrelationID)
throw new JMSException("JMSCorrelationID is an array");
return jmsCorrelationIDString;
}
public Destination getJMSReplyTo() throws JMSException {
return jmsReplyTo;
}
public void setJMSReplyTo(Destination replyTo) throws JMSException {
jmsReplyTo = replyTo;
}
public Destination getJMSDestination() throws JMSException {
return jmsDestination;
}
public void setJMSDestination(Destination destination) throws JMSException {
jmsDestination = destination;
}
public int getJMSDeliveryMode() throws JMSException {
return jmsDeliveryMode;
}
public void setJMSDeliveryMode(int deliveryMode) throws JMSException {
jmsDeliveryMode = deliveryMode;
}
public boolean getJMSRedelivered() throws JMSException {
return jmsRedelivered;
}
public void setJMSRedelivered(boolean redelivered) throws JMSException {
jmsRedelivered = redelivered;
}
public String getJMSType() throws JMSException {
return jmsType;
}
public void setJMSType(String type) throws JMSException {
jmsType = type;
}
public long getJMSExpiration() throws JMSException {
return jmsExpiration;
}
public void setJMSExpiration(long expiration) throws JMSException {
jmsExpiration = expiration;
}
public int getJMSPriority() throws JMSException {
return jmsPriority;
}
public void setJMSPriority(int priority) throws JMSException {
jmsPriority = priority;
}
public void clearProperties() throws JMSException {
prop = new Hashtable();
propReadWrite = true;
}
public boolean propertyExists(String name) throws JMSException {
return prop.containsKey(name);
}
public boolean getBooleanProperty(String name) throws JMSException {
Object value = prop.get(name);
if (value == null)
throw new NullPointerException();
if (value instanceof Boolean)
return ((Boolean) value).booleanValue();
else if (value instanceof String)
return Boolean.getBoolean((String) value);
else
throw new MessageFormatException("Invalid conversion");
}
public byte getByteProperty(String name) throws JMSException {
Object value = prop.get(name);
if (value == null)
throw new NullPointerException();
if (value instanceof Byte)
return ((Byte) value).byteValue();
else if (value instanceof String)
return Byte.parseByte((String) value);
else
throw new MessageFormatException("Invalid conversion");
}
public short getShortProperty(String name) throws JMSException {
Object value = prop.get(name);
if (value == null)
throw new NullPointerException();
if (value instanceof Byte)
return ((Byte) value).shortValue();
else if (value instanceof Short)
return ((Short) value).shortValue();
else if (value instanceof String)
return Short.parseShort((String) value);
else
throw new MessageFormatException("Invalid conversion");
}
public int getIntProperty(String name) throws JMSException {
Object value = prop.get(name);
if (value == null)
throw new NullPointerException();
if (value instanceof Byte)
return ((Byte) value).intValue();
else if (value instanceof Short)
return ((Short) value).intValue();
else if (value instanceof Integer)
return ((Integer) value).intValue();
else if (value instanceof String)
return Integer.parseInt((String) value);
else
throw new MessageFormatException("Invalid conversion");
}
public long getLongProperty(String name) throws JMSException {
Object value = prop.get(name);
if (value == null)
throw new NullPointerException();
if (value instanceof Byte)
return ((Byte) value).longValue();
else if (value instanceof Short)
return ((Short) value).longValue();
else if (value instanceof Integer)
return ((Integer) value).longValue();
else if (value instanceof Long)
return ((Long) value).longValue();
else if (value instanceof String)
return Long.parseLong((String) value);
else
throw new MessageFormatException("Invalid conversion");
}
public float getFloatProperty(String name) throws JMSException {
Object value = prop.get(name);
if (value == null)
throw new NullPointerException();
if (value instanceof Float)
return ((Float) value).floatValue();
else if (value instanceof String)
return Float.parseFloat((String) value);
else
throw new MessageFormatException("Invalid conversion");
}
public double getDoubleProperty(String name) throws JMSException {
Object value = prop.get(name);
if (value == null)
throw new NullPointerException();
if (value instanceof Float)
return ((Float) value).doubleValue();
else if (value instanceof Double)
return ((Double) value).doubleValue();
else if (value instanceof String)
return Double.parseDouble((String) value);
else
throw new MessageFormatException("Invalid conversion");
}
public String getStringProperty(String name) throws JMSException {
Object value = prop.get(name);
if (value == null)
return null;
if (value instanceof Boolean)
return ((Boolean) value).toString();
else if (value instanceof Byte)
return ((Byte) value).toString();
else if (value instanceof Short)
return ((Short) value).toString();
else if (value instanceof Integer)
return ((Integer) value).toString();
else if (value instanceof Long)
return ((Long) value).toString();
else if (value instanceof Float)
return ((Float) value).toString();
else if (value instanceof Double)
return ((Double) value).toString();
else if (value instanceof String)
return (String) value;
else
throw new MessageFormatException("Invalid conversion");
}
public Object getObjectProperty(String name) throws JMSException {
Object value = prop.get(name);
return value;
}
public Enumeration getPropertyNames() throws JMSException {
return prop.keys();
}
void CheckPropertyName(String name) throws JMSException {
if (name.regionMatches(false, 0, "JMS_", 0, 4)) {
throw new JMSException("Bad property name");
}
if (name.regionMatches(false, 0, "JMSX", 0, 4)) {
if (name.equals("JMSXGroupId"))
return;
if (name.equals("JMSXGroupSeq"))
return;
throw new JMSException("Bad property name");
}
}
public void setBooleanProperty(String name, boolean value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite)
throw new MessageNotWriteableException("Properties are
read-only");
prop.put(name, new Boolean(value));
}
public void setByteProperty(String name, byte value) throws JMSException {
CheckPropertyName(name);
if (!propReadWrite)
throw new MessageNotWriteableException("Properties are
read-only");
prop.put(name, new Byte(value));
}
public void setShortProperty(String name, short value) throws JMSException {
CheckPropertyName(name);
if (!propReadWrite)
throw new MessageNotWriteableException("Properties are
read-only");
prop.put(name, new Short(value));
}
public void setIntProperty(String name, int value) throws JMSException {
CheckPropertyName(name);
if (!propReadWrite)
throw new MessageNotWriteableException("Properties are
read-only");
prop.put(name, new Integer(value));
}
public void setLongProperty(String name, long value) throws JMSException {
CheckPropertyName(name);
if (!propReadWrite)
throw new MessageNotWriteableException("Properties are
read-only");
prop.put(name, new Long(value));
}
public void setFloatProperty(String name, float value) throws JMSException {
CheckPropertyName(name);
if (!propReadWrite)
throw new MessageNotWriteableException("Properties are
read-only");
prop.put(name, new Float(value));
}
public void setDoubleProperty(String name, double value) throws JMSException {
CheckPropertyName(name);
if (!propReadWrite)
throw new MessageNotWriteableException("Properties are
read-only");
prop.put(name, new Double(value));
}
public void setStringProperty(String name, String value) throws JMSException {
CheckPropertyName(name);
if (!propReadWrite)
throw new MessageNotWriteableException("Properties are
read-only");
prop.put(name, new String(value));
}
public void setObjectProperty(String name, Object value) throws JMSException {
CheckPropertyName(name);
if (!propReadWrite)
throw new MessageNotWriteableException("Properties are
read-only");
if (value instanceof Boolean)
prop.put(name, value);
else if (value instanceof Byte)
prop.put(name, value);
else if (value instanceof Short)
prop.put(name, value);
else if (value instanceof Integer)
prop.put(name, value);
else if (value instanceof Long)
prop.put(name, value);
else if (value instanceof Float)
prop.put(name, value);
else if (value instanceof Double)
prop.put(name, value);
else if (value instanceof String)
prop.put(name, value);
else
throw new MessageFormatException("Invalid object type");
}
public void clearBody() throws JMSException {
//Inherited classes clear their content here
msgReadOnly = false;
}
/**
* acknowledge method comment.
*/
public void acknowledge() throws javax.jms.JMSException {
}
void setReadOnlyMode() {
propReadWrite = false;
msgReadOnly = true;
}
public boolean isOutdated() {
if (jmsExpiration == 0)
return false;
long ts = System.currentTimeMillis();
return jmsExpiration < ts;
}
String text;
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less
* than, equal to, or greater than the specified object.<p>
*
* The implementor must ensure <tt>sgn(x.compareTo(y)) ==
* -sgn(y.compareTo(x))</tt> for all <tt>x</tt> and <tt>y</tt>. (This
* implies that <tt>x.compareTo(y)</tt> must throw an exception iff
* <tt>y.compareTo(x)</tt> throws an exception.)<p>
*
* The implementor must also ensure that the relation is transitive:
* <tt>(x.compareTo(y)>0 && y.compareTo(z)>0)</tt> implies
* <tt>x.compareTo(z)>0</tt>.<p>
*
* Finally, the implementer must ensure that <tt>x.compareTo(y)==0</tt>
* implies that <tt>sgn(x.compareTo(z)) == sgn(y.compareTo(z))</tt>, for
* all <tt>z</tt>.<p>
*
* It is strongly recommended, but <i>not</i> strictly required that
* <tt>(x.compareTo(y)==0) == (x.equals(y))</tt>. Generally speaking, any
* class that implements the <tt>Comparable</tt> interface and violates
* this condition should clearly indicate this fact. The recommended
* language is "Note: this class has a natural ordering that is
* inconsistent with equals."
*
* @param o the Object to be compared.
* @return a negative integer, zero, or a positive integer as this object
* is less than, equal to, or greater than the specified object.
*
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this Object.
*/
public int compareTo(java.lang.Object o) {
return 0;
}
/**
* Insert the method's description here.
* Creation date: (12/31/00 5:32:58 PM)
* @return java.lang.String
*/
public java.lang.String getText() {
return text;
}
/**
* Insert the method's description here.
* Creation date: (12/31/00 5:32:58 PM)
* @param newText java.lang.String
*/
public void setText(java.lang.String newText) {
text = newText;
}
}