Ed Kaltenbach created ARTEMIS-630:
-------------------------------------
Summary: STOMP server quits sending to all subscribers when one
client disconnects
Key: ARTEMIS-630
URL: https://issues.apache.org/jira/browse/ARTEMIS-630
Project: ActiveMQ Artemis
Issue Type: Bug
Components: Stomp
Affects Versions: 1.1.0
Environment: Wildfly 10.0.0, Windows 7 64 bit, Java 1.8.0_91
Reporter: Ed Kaltenbach
Multiple clients connected to a JMS topic via STOMP. When one client
disconnects from the server then all clients quit receiving messages and cannot
send messages. As soon as a new client sends a SUBSCRIBE message then all
clients begin receiving messages again.
Here is the way STOMP and the JMS topic is defined in standalone.xml:
<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
<server name="default">
<security-setting name="#">
<role name="guest" delete-non-durable-queue="true"
create-non-durable-queue="true" consume="true" send="true"/>
</security-setting>
<address-setting name="#"
message-counter-history-day-limit="10" page-size-bytes="2097152"
max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue"
dead-letter-address="jms.queue.DLQ"/>
<http-connector name="http-connector" endpoint="http-acceptor"
socket-binding="http"/>
<http-connector name="http-connector-throughput"
endpoint="http-acceptor-throughput" socket-binding="http">
<param name="batch-delay" value="50"/>
</http-connector>
<in-vm-connector name="in-vm" server-id="0"/>
<http-acceptor name="http-acceptor" http-listener="default"/>
<http-acceptor name="http-acceptor-throughput"
http-listener="default">
<param name="batch-delay" value="50"/>
<param name="direct-deliver" value="false"/>
</http-acceptor>
<in-vm-acceptor name="in-vm" server-id="0"/>
<acceptor name="stomp-acceptor"
factory-class="org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory">
<param name="protocols" value="STOMP"/>
<param name="port" value="61613"/>
</acceptor>
<jms-queue name="ExpiryQueue"
entries="java:/jms/queue/ExpiryQueue"/>
<jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
<jms-topic name="ACRS_Exit"
entries="java:/jms/topic/ACRS_Exit"/>
<connection-factory name="InVmConnectionFactory"
entries="java:/ConnectionFactory" connectors="in-vm"/>
<connection-factory name="RemoteConnectionFactory"
entries="java:jboss/exported/jms/RemoteConnectionFactory"
connectors="http-connector"/>
<pooled-connection-factory name="activemq-ra" transaction="xa"
entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory"
connectors="in-vm"/>
</server>
</subsystem>
Run multiple instances of the program listed below. Stagger the starts by a
minute or so. As soon and one instance of the program completes, all other
instances will start having problems sending messages to the topic. The server
will send a response that says "Destination does not exist\c
jms.topic.ACRS_Exit". If you start another instance then all other running
instances will start receiving messages and will be able to send messages.
Here is the code for the sample program:
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Future;
public class StompClient {
public static void main(String[] args) throws Exception {
StompClient foo = new StompClient();
}
public StompClient() throws Exception {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
SocketAddress serverAddr = new InetSocketAddress("localhost", 61613);
Future<Void> result = channel.connect(serverAddr);
result.get();
System.out.println("Socket Connected");
// start two threads, one for reading and one for writing
ReaderThread rt = new ReaderThread(channel);
Thread readerThread = new Thread(rt);
readerThread.start();
Thread writerThread = new Thread(new WriterThread(channel));
writerThread.start();
}
protected class ReaderThread implements Runnable {
AsynchronousSocketChannel m_channel;
public ReaderThread(AsynchronousSocketChannel channel) {
m_channel = channel;
}
public void run() {
String outputStr;
Integer readByteCnt;
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(2048);
buffer.clear();
Future<Integer> result = m_channel.read(buffer);
try
{
readByteCnt = (Integer)result.get();
} catch (Exception ex) {
readByteCnt = -1;
}
if (readByteCnt > -1)
{
// convert the bytes to a string
try
{
outputStr = new String(buffer.array(), "UTF-8");
outputStr = outputStr.trim();
System.out.println("-----Beginning of Message From Stomp Server:");
System.out.println(outputStr);
System.out.println("-----Ending of Message From Stomp Server");
} catch (Exception ex) {
System.out.println("ReaderThread Exception:" + ex.getMessage());
}
}
}
}
}
protected class WriterThread implements Runnable {
AsynchronousSocketChannel m_channel;
public WriterThread(AsynchronousSocketChannel channel) {
m_channel = channel;
}
public void run() {
String topicName = "jms.topic.ACRS_Exit";
ByteBuffer buffer = ByteBuffer.allocate(2048);
Charset cs = Charset.forName("UTF-8");
Integer writtenByteCnt;
Future<Integer> result;
byte[] data;
String msg;
try
{
msg = "CONNECT\n";
msg = msg + "accept-version:1.2\n";
msg = msg + "heart-beat:5000,5000\n";
msg = msg + "login:dynsub\n";
msg = msg + "passcode:dynsub\n";
msg = msg + "\n";
msg = msg + '\0';
SendMessageToStomp(m_channel, msg);
java.lang.Thread.sleep(5000);
msg = "SUBSCRIBE\n";
msg = msg + "destination:" + topicName + "\n";
msg = msg + "id:dest1\n";
msg = msg + "ack:auto\n";
msg = msg + "\n";
msg = msg + '\0';
SendMessageToStomp(m_channel, msg);
// send a heartbeat message every 5 seconds
// NOTE: this was changed to send the date and time instead of an empty
message
DateFormat dateFormat = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss");
for(int i = 0; i < 20; i++)
{
java.lang.Thread.sleep(5000);
SendMessageToStomp(m_channel, "\n");
// NOTE: this was changed to also send a message to the topic
Date d = new Date();
String dateStr = dateFormat.format(d);
msg = "SEND\n";
msg += "destination:" + topicName + "\n";
msg += "content-type:text/plain\n";
msg += "content-length:" + dateStr.length() + "\n";
msg += "\n";
msg += dateStr;
msg += '\0';
SendMessageToStomp(m_channel, msg);
}
java.lang.Thread.sleep(5000);
// now test the unsubscribe
msg = "UNSUBSCRIBE\n";
msg = msg + "id:dest1\n";
msg = msg + "\n";
msg = msg + '\0';
SendMessageToStomp(m_channel, msg);
msg = "DISCONNECT\n";
msg = msg + '\0';
SendMessageToStomp(m_channel, msg);
m_channel.shutdownInput();
m_channel.shutdownOutput();
m_channel.close();
System.exit(0);
/*
// if the UNSUBSCRIBE, DISCONNECT, and socket shutdown/close
// are removed and the following added, the server will
// continue to send messages to other clients for about 20 seconds.
// send a heartbeat message every 5 seconds
while(true)
{
java.lang.Thread.sleep(5000);
SendMessageToStomp(m_channel, "\n");
}
*/
} catch (Exception ex) {
System.out.println("WriterThread Exception:" + ex.getMessage());
}
}
}
protected void SendMessageToStomp(AsynchronousSocketChannel channel, String
msg) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(2048);
Charset cs = Charset.forName("UTF-8");
Integer writtenByteCnt;
Future<Integer> result;
byte[] data;
data = msg.getBytes(cs);
buffer.put(data);
buffer.flip();
System.out.println("-----Sending message to Stomp Server:\n" + msg);
result = channel.write(buffer);
writtenByteCnt = (Integer)result.get();
}
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)