Mansour Al Akeel created QPID-6587:
--------------------------------------
Summary: Qpid crash when trying to convert a message
Key: QPID-6587
URL: https://issues.apache.org/jira/browse/QPID-6587
Project: Qpid
Issue Type: Bug
Components: Java Broker
Affects Versions: 0.32
Environment: Linux localhost 3.17.8-gentoo-r1 #6 SMP Sat May 16
15:11:24 GST 2015 x86_64 Intel(R) Core(TM) i5-2540M CPU @ 2.60GHz GenuineIntel
GNU/Linux
Reporter: Mansour Al Akeel
Priority: Blocker
Fix For: 0.33
Related Issues: QPID-5536
This issue may be related to converting message format.
To generate this problem, create a new port (ie AMQP_0_10) and give it
ampq_0-10, assign it another port number (for example 5673).
Setup the security provider to Anonymous (create a provider for this).
Create a queue "examples".
Send few messages to "examples" to AMQP port.
Try to read from AMQP_0_10.
The broker will crash end exit.
In my case, the writing was done from Java, and reading from Python.
These code can be used:
For maven:
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.2.0</version>
</dependency>
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
public class Hello
{
private static final String USER = "guest";
private static final String PASSWORD = "guest";
public static void main(String[] args)
{
try
{
Properties prop = new Properties();
prop.put("java.naming.factory.initial",
"org.apache.qpid.jms.jndi.JmsInitialContextFactory");
prop.put("connectionfactory.localhost",
"amqp://localhost:5672/");
prop.put("queue.myQueue", "examples");
Context context = new InitialContext(prop);
ConnectionFactory factory = (ConnectionFactory)
context.lookup("localhost");
Destination queue = (Destination)
context.lookup("myQueue");
Connection connection = factory.createConnection(USER,
PASSWORD);
connection.setExceptionListener(new
MyExceptionListener());
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i <= 10; i++)
{
MessageProducer producer =
session.createProducer(queue);
TextMessage message =
session.createTextMessage("Hello world from Java " + i + " !");
Thread.sleep(1000L);
producer.send(message,
DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY,
Message.DEFAULT_TIME_TO_LIVE);
}
connection.close();
} catch (Exception exp)
{
System.out.println("Caught exception, exiting.");
exp.printStackTrace();
System.exit(1);
}
}
private static class MyExceptionListener implements ExceptionListener
{
public void onException(JMSException exception)
{
System.out.println("Connection ExceptionListener fired,
exiting.");
exception.printStackTrace();
System.exit(1);
}
}
}
#!/usr/bin/env python
from qpid.messaging import *
url = "localhost:5673"
queue = "examples"
connection = Connection(url)
try:
connection.open()
session = connection.session()
while True:
receiver = session.receiver(queue)
message = receiver.fetch()
print "got message"
print message.content
#session.acknowledge()
except MessagingError,m:
print "exeption: " , m
connection.close()
Note: If the queue is empty and we try to read, no writing can be done. The
client (java), will throw an exception:
javax.jms.JMSException: java.lang.NullPointerException [condition =
amqp:connection:forced]
at
org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.getRemoteError(AmqpAbstractResource.java:234)
at
org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.remotelyClosed(AmqpAbstractResource.java:170)
at
org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:291)
at
org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:734)
at
org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1500(AmqpProvider.java:87)
at
org.apache.qpid.jms.provider.amqp.AmqpProvider$16.run(AmqpProvider.java:667)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
If the queue has messages, the broker will crash when we consuming messages
(python).
The following trace generated by the broker:
2015-06-13 11:48:57,262 DEBUG [IoReceiver - /127.0.0.1:48469]
(transport.Connection) - RECV: [conn:29464cc5] ch=0 MessageFlow(destination=0,
unit=MESSAGE, value=1)
2015-06-13 11:48:57,262 DEBUG [IoReceiver - /127.0.0.1:48469]
(transport.Session) - identify: ch=0, commandId=5
2015-06-13 11:48:57,262 DEBUG [IoReceiver - /127.0.0.1:48469]
(transport.Session) - ssn:"fb01ce9e-fc06-455a-abd7-c1c7df6b8664:0" ch=0
processed([5,5]) 4 4
2015-06-13 11:48:57,262 DEBUG [IoReceiver - /127.0.0.1:48469]
(transport.Session) - ssn:"fb01ce9e-fc06-455a-abd7-c1c7df6b8664:0" processed:
{[0, 4]}
2015-06-13 11:48:57,262 DEBUG [pool-1-thread-15] (transport.Connection) -
FLUSH: [conn:29464cc5]
2015-06-13 11:48:57,262 DEBUG [pool-1-thread-15] (transport.Connection) -
FLUSH: [conn:29464cc5]
########################################################################
#
# Unhandled Exception java.lang.NullPointerException in Thread pool-1-thread-15
#
# Exiting
#
########################################################################
java.lang.NullPointerException
at java.util.HashMap.putMapEntries(HashMap.java:500)
at java.util.HashMap.<init>(HashMap.java:489)
at
org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0$MessageHeader_1_0.getHeadersAsMap(MessageMetaData_1_0.java:586)
at
org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convertMetaData(MessageConverter_1_0_to_v0_10.java:181)
at
org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convertToStoredMessage(MessageConverter_1_0_to_v0_10.java:70)
at
org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convert(MessageConverter_1_0_to_v0_10.java:60)
at
org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convert(MessageConverter_1_0_to_v0_10.java:42)
at
org.apache.qpid.server.protocol.v0_10.ConsumerTarget_0_10.send(ConsumerTarget_0_10.java:218)
at
org.apache.qpid.server.queue.QueueConsumerImpl.send(QueueConsumerImpl.java:469)
at
org.apache.qpid.server.queue.AbstractQueue.deliverMessage(AbstractQueue.java:1323)
at
org.apache.qpid.server.queue.AbstractQueue.attemptDelivery(AbstractQueue.java:2069)
at
org.apache.qpid.server.queue.AbstractQueue.processQueue(AbstractQueue.java:2237)
at org.apache.qpid.server.queue.QueueRunner$1.run(QueueRunner.java:77)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at org.apache.qpid.server.queue.QueueRunner.run(QueueRunner.java:68)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-06-13 11:48:57,264 ERROR [pool-1-thread-15] (server.Main) - Uncaught
exception, shutting down.
java.lang.NullPointerException
at java.util.HashMap.putMapEntries(HashMap.java:500)
at java.util.HashMap.<init>(HashMap.java:489)
at
org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0$MessageHeader_1_0.getHeadersAsMap(MessageMetaData_1_0.java:586)
at
org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convertMetaData(MessageConverter_1_0_to_v0_10.java:181)
at
org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convertToStoredMessage(MessageConverter_1_0_to_v0_10.java:70)
at
org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convert(MessageConverter_1_0_to_v0_10.java:60)
at
org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convert(MessageConverter_1_0_to_v0_10.java:42)
at
org.apache.qpid.server.protocol.v0_10.ConsumerTarget_0_10.send(ConsumerTarget_0_10.java:218)
at
org.apache.qpid.server.queue.QueueConsumerImpl.send(QueueConsumerImpl.java:469)
at
org.apache.qpid.server.queue.AbstractQueue.deliverMessage(AbstractQueue.java:1323)
at
org.apache.qpid.server.queue.AbstractQueue.attemptDelivery(AbstractQueue.java:2069)
at
org.apache.qpid.server.queue.AbstractQueue.processQueue(AbstractQueue.java:2237)
at org.apache.qpid.server.queue.QueueRunner$1.run(QueueRunner.java:77)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at org.apache.qpid.server.queue.QueueRunner.run(QueueRunner.java:68)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-06-13 11:48:57,265 INFO [pool-1-thread-16] (subscription.state) - [Queue
Delivery] [sub:1(vh(/default)/qu(examples)] SUB-1003 : State : SUSPENDED
The reciever will produce this error (python):
exeption: connection aborted
Traceback (most recent call last):
File "examples/api/hello", line 27, in <module>
connection.close()
File "<string>", line 6, in close
File
"/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py",
line 345, in close
ssn.close(timeout=timeout)
File "<string>", line 6, in close
File
"/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py",
line 795, in close
self.sync(timeout=timeout)
File "<string>", line 6, in sync
File
"/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py",
line 786, in sync
if not self._ewait(lambda: not self.outgoing and not self.acked,
timeout=timeout):
File
"/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py",
line 596, in _ewait
result = self.connection._ewait(lambda: self.error or predicate(), timeout)
File
"/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py",
line 234, in _ewait
self.check_error()
File
"/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py",
line 227, in check_error
raise e
qpid.messaging.exceptions.ConnectionError: connection aborted
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]