Ed Kaltenbach created ARTEMIS-630:
-------------------------------------

             Summary: STOMP server quits sending to all subscribers when one 
client disconnects
                 Key: ARTEMIS-630
                 URL: https://issues.apache.org/jira/browse/ARTEMIS-630
             Project: ActiveMQ Artemis
          Issue Type: Bug
          Components: Stomp
    Affects Versions: 1.1.0
         Environment: Wildfly 10.0.0, Windows 7 64 bit, Java 1.8.0_91
            Reporter: Ed Kaltenbach


Multiple clients connected to a JMS topic via STOMP.  When one client 
disconnects from the server then all clients quit receiving messages and cannot 
send messages.  As soon as a new client sends a SUBSCRIBE message then all 
clients begin receiving messages again.

Here is the way STOMP and the JMS topic is defined in standalone.xml:
<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
            <server name="default">
                <security-setting name="#">
                    <role name="guest" delete-non-durable-queue="true" 
create-non-durable-queue="true" consume="true" send="true"/>
                </security-setting>
                <address-setting name="#" 
message-counter-history-day-limit="10" page-size-bytes="2097152" 
max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" 
dead-letter-address="jms.queue.DLQ"/>
                <http-connector name="http-connector" endpoint="http-acceptor" 
socket-binding="http"/>
                <http-connector name="http-connector-throughput" 
endpoint="http-acceptor-throughput" socket-binding="http">
                    <param name="batch-delay" value="50"/>
                </http-connector>
                <in-vm-connector name="in-vm" server-id="0"/>
                <http-acceptor name="http-acceptor" http-listener="default"/>
                <http-acceptor name="http-acceptor-throughput" 
http-listener="default">
                    <param name="batch-delay" value="50"/>
                    <param name="direct-deliver" value="false"/>
                </http-acceptor>
                <in-vm-acceptor name="in-vm" server-id="0"/>
                <acceptor name="stomp-acceptor" 
factory-class="org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory">
                    <param name="protocols" value="STOMP"/>
                    <param name="port" value="61613"/>
                </acceptor>
                <jms-queue name="ExpiryQueue" 
entries="java:/jms/queue/ExpiryQueue"/>
                <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
                <jms-topic name="ACRS_Exit" 
entries="java:/jms/topic/ACRS_Exit"/>
                <connection-factory name="InVmConnectionFactory" 
entries="java:/ConnectionFactory" connectors="in-vm"/>
                <connection-factory name="RemoteConnectionFactory" 
entries="java:jboss/exported/jms/RemoteConnectionFactory" 
connectors="http-connector"/>
                <pooled-connection-factory name="activemq-ra" transaction="xa" 
entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" 
connectors="in-vm"/>
            </server>
        </subsystem>


Run multiple instances of the program listed below.  Stagger the starts by a 
minute or so.  As soon and one instance of the program completes, all other 
instances will start having problems sending messages to the topic.  The server 
will send a response that says "Destination does not exist\c 
jms.topic.ACRS_Exit".  If you start another instance then all other running 
instances will start receiving messages and will be able to send messages.

Here is the code for the sample program:

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Future;

public class StompClient {
  public static void main(String[] args) throws Exception {
    StompClient foo = new StompClient();
  }

  public StompClient() throws Exception {
    AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
    SocketAddress serverAddr = new InetSocketAddress("localhost", 61613);
    Future<Void> result = channel.connect(serverAddr);
    result.get();
    System.out.println("Socket Connected");

    // start two threads, one for reading and one for writing
    ReaderThread rt = new ReaderThread(channel);
    Thread readerThread = new Thread(rt);
    readerThread.start();

    Thread writerThread = new Thread(new WriterThread(channel));
    writerThread.start();
  }

  protected class ReaderThread implements Runnable {
    AsynchronousSocketChannel m_channel;
    public ReaderThread(AsynchronousSocketChannel channel) {
      m_channel = channel;
    }

    public void run() {
      String outputStr;
      Integer readByteCnt;

      while (true) {
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        buffer.clear();
        Future<Integer> result = m_channel.read(buffer);
        try
        {
          readByteCnt = (Integer)result.get();
        } catch (Exception ex) {
          readByteCnt = -1;
        }
        if (readByteCnt > -1)
        {
          // convert the bytes to a string
          try
          {
            outputStr = new String(buffer.array(), "UTF-8");
            outputStr = outputStr.trim();
            System.out.println("-----Beginning of Message From Stomp Server:");
            System.out.println(outputStr);
            System.out.println("-----Ending of Message From Stomp Server");
          } catch (Exception ex) {
            System.out.println("ReaderThread Exception:" + ex.getMessage());
          }
        }
      }
    }
  }

  protected class WriterThread implements Runnable {
    AsynchronousSocketChannel m_channel;
    public WriterThread(AsynchronousSocketChannel channel) {
      m_channel = channel;
    }

    public void run() {
      String topicName = "jms.topic.ACRS_Exit";
      ByteBuffer buffer = ByteBuffer.allocate(2048);
      Charset cs = Charset.forName("UTF-8");
      Integer writtenByteCnt;
      Future<Integer> result;
      byte[] data;
      String msg;

      try
      {
        msg = "CONNECT\n";
        msg = msg + "accept-version:1.2\n";
        msg = msg + "heart-beat:5000,5000\n";
        msg = msg + "login:dynsub\n";
        msg = msg + "passcode:dynsub\n";
        msg = msg + "\n";
        msg = msg + '\0';

        SendMessageToStomp(m_channel, msg);

        java.lang.Thread.sleep(5000);

        msg = "SUBSCRIBE\n";
        msg = msg + "destination:" + topicName + "\n";
        msg = msg + "id:dest1\n";
        msg = msg + "ack:auto\n";
        msg = msg + "\n";
        msg = msg + '\0';

        SendMessageToStomp(m_channel, msg);

        // send a heartbeat message every 5 seconds
        // NOTE: this was changed to send the date and time instead of an empty 
message
        DateFormat dateFormat = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss");
        for(int i = 0; i < 20; i++)
        {
          java.lang.Thread.sleep(5000);
          SendMessageToStomp(m_channel, "\n");

          // NOTE: this was changed to also send a message to the topic
          Date d = new Date();
          String dateStr = dateFormat.format(d);
          msg = "SEND\n";
          msg += "destination:" + topicName + "\n";
          msg += "content-type:text/plain\n";
          msg += "content-length:" + dateStr.length() + "\n";
          msg += "\n";
          msg += dateStr;
          msg += '\0';
          SendMessageToStomp(m_channel, msg);
        }

        java.lang.Thread.sleep(5000);

        // now test the unsubscribe
        msg = "UNSUBSCRIBE\n";
        msg = msg + "id:dest1\n";
        msg = msg + "\n";
        msg = msg + '\0';
        SendMessageToStomp(m_channel, msg);

        msg = "DISCONNECT\n";
        msg = msg + '\0';
        SendMessageToStomp(m_channel, msg);

        m_channel.shutdownInput();
        m_channel.shutdownOutput();
        m_channel.close();

        System.exit(0);

/*
        // if the UNSUBSCRIBE, DISCONNECT, and socket shutdown/close
        // are removed and the following added, the server will
        // continue to send messages to other clients for about 20 seconds.

        // send a heartbeat message every 5 seconds
        while(true)
        {
          java.lang.Thread.sleep(5000);
          SendMessageToStomp(m_channel, "\n");
        }
*/

      } catch (Exception ex) {
        System.out.println("WriterThread Exception:" + ex.getMessage());
      }
    }
  }

  protected void SendMessageToStomp(AsynchronousSocketChannel channel, String 
msg) throws Exception {
    ByteBuffer buffer = ByteBuffer.allocate(2048);
    Charset cs = Charset.forName("UTF-8");
    Integer writtenByteCnt;
    Future<Integer> result;
    byte[] data;

    data = msg.getBytes(cs);
    buffer.put(data);
    buffer.flip();

    System.out.println("-----Sending message to Stomp Server:\n" + msg);
    result = channel.write(buffer);
    writtenByteCnt = (Integer)result.get();
  }
}








--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to