[ 
https://issues.apache.org/jira/browse/OPENWIRE-16?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276621#comment-16276621
 ] 

Marcel Hillmann commented on OPENWIRE-16:
-----------------------------------------

import java.util.ArrayList;
import java.util.List;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.junit.Test;

public class QueueObserverTest implements ExceptionListener, MessageListener {


        @Test
        public void register() throws Exception {
                ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("nio://localhost:61616");
                ActiveMQConnection connection = (ActiveMQConnection) 
factory.createConnection("admin",
                                "admin");
                connection.setExceptionListener(this);
                connection.start();
//              connection.setClientID(UUID.randomUUID().toString());
                ActiveMQSession session = (ActiveMQSession) 
connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                session.setMessageListener(this);
                
                final Queue queue = new ActiveMQQueue("loggingQueue");
                final Topic topic = new ActiveMQTopic("loggingQueue");
                
                
                final List<Topic> topics = new ArrayList<>();
                topics.add(new ActiveMQTopic("ActiveMQ.Advisory.Queue"));
                topics.add(new ActiveMQTopic("ActiveMQ.Advisory.Topic"));
                topics.add(AdvisorySupport.getConnectionAdvisoryTopic());
//
                topics.add(AdvisorySupport.getProducerAdvisoryTopic(queue));
                topics.add(AdvisorySupport.getConsumerAdvisoryTopic(queue));
                topics.add(AdvisorySupport.getNoConsumersAdvisoryTopic(queue));
                
topics.add(AdvisorySupport.getMessageDeliveredAdvisoryTopic(queue));
//              
                topics.add(AdvisorySupport.getProducerAdvisoryTopic(topic));
                topics.add(AdvisorySupport.getConsumerAdvisoryTopic(topic));
                topics.add(AdvisorySupport.getNoConsumersAdvisoryTopic(topic));
                
topics.add(AdvisorySupport.getMessageDeliveredAdvisoryTopic(topic));
                
                topics.forEach((_topic) -> {
                        try {
                                consume(session, _topic);
                        }
                        catch (JMSException e) {
                                e.printStackTrace();
                        }
                });
                connection.start();
                while (connection.isStarted() && !connection.isClosed()) {
                        Thread.sleep(1000);
                }

        }

        
        private final void consume(Session session, Topic topic) throws 
JMSException{
                System.out.println(topic);
                MessageConsumer consumer = session.createConsumer(topic);
                consumer.setMessageListener(this);
        }
        @Override
        public void onException(JMSException exception) {
                exception.printStackTrace();
        }

        @Override
        public void onMessage(Message message) {
                if (message instanceof ActiveMQMessage) {
                        ActiveMQMessage aMsg = (ActiveMQMessage) message;
                        DataStructure dataStructure = aMsg.getDataStructure();
                        if(dataStructure instanceof ConnectionInfo){
                                ConnectionInfo connectionInfo = 
(ConnectionInfo) dataStructure;
                                System.out.println("Connect: 
"+connectionInfo.toString());
                        }else if(dataStructure instanceof RemoveInfo){
                                RemoveInfo removeInfo = (RemoveInfo) 
dataStructure;
                                System.out.println("Remove : 
"+removeInfo.toString());
                        }else if(dataStructure instanceof ProducerInfo){
                                ProducerInfo producerInfo = (ProducerInfo) 
dataStructure;
                                System.out.println("Produce: 
"+producerInfo.toString());
                        }else if( message instanceof 
org.apache.activemq.command.Message){
                                org.apache.activemq.command.Message aMessage  = 
(org.apache.activemq.command.Message) dataStructure;
                                System.out.println("Message: 
"+aMessage.toString());
                        }
                }
                else {
                        System.out.println("QueueObserverTest.onMessage(): 
"+message.getClass().getSimpleName());
                }

        }
}

> ClassCastException: ConnectionAdvisoryTopic
> -------------------------------------------
>
>                 Key: OPENWIRE-16
>                 URL: https://issues.apache.org/jira/browse/OPENWIRE-16
>             Project: ActiveMQ OpenWire
>          Issue Type: Bug
>         Environment: Windows 7
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
>            Reporter: Marcel Hillmann
>
> Hi,
> If I'm try to consume the connection advisory topic it raise an 
> ClassCastException.
> Is this an known Isssue?
> How can I fixit?
> {code:java}
> Caused by: java.io.IOException: org.apache.activemq.command.BrokerId cannot 
> be cast to org.apache.activemq.command.ConsumerId
>       at 
> org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:40)
>       ... 7 more
> Caused by: java.lang.ClassCastException: org.apache.activemq.command.BrokerId 
> cannot be cast to org.apache.activemq.command.ConsumerId
>       at 
> org.apache.activemq.openwire.v12.MessageMarshaller.tightUnmarshal(MessageMarshaller.java:75)
>       at 
> org.apache.activemq.openwire.v12.ActiveMQMessageMarshaller.tightUnmarshal(ActiveMQMessageMarshaller.java:66)
>       at 
> org.apache.activemq.openwire.OpenWireFormat.tightUnmarshalNestedObject(OpenWireFormat.java:456)
>       at 
> org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightUnmarsalNestedObject(BaseDataStreamMarshaller.java:125)
>       at 
> org.apache.activemq.openwire.v12.MessageDispatchMarshaller.tightUnmarshal(MessageDispatchMarshaller.java:71)
>       at 
> org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:365)
>       at 
> org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:278)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to