[
https://issues.apache.org/jira/browse/AMQ-4802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13794176#comment-13794176
]
Marc Leineweber commented on AMQ-4802:
--------------------------------------
Hello Timothy,
my test case consists of two classes - one producer and one comsumer. My real
implementation reads input from a file, but even with writing 0-bytes to the
OutputStream, the InputFile reading fails. I write 64 MB (64*1024*1024 bytes).
See my source code below.
Thanks in advance,
Marc
{code:title=The Consumer|borderStyle=solid}
package test;
import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* Test class: test comsumer for ActiveMQInputStream.
*/
public class AMQStreamConsumer {
private static final Logger logger =
Logger.getLogger(AMQStreamConsumer.class.getName());
public static final String outFilepath = "MyTestFile";
/**
* @param args the command line arguments
*/
public static void main(String[] args) throws IOException {
try {
readInputStream("AMQStreamTest");
System.exit(0);
} catch (JMSException ex) {
Logger.getLogger(AMQStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
} catch (NamingException ex) {
Logger.getLogger(AMQStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
/**
* Reads the inputStream from the given queue.
*
* @param aStreamQueueName
* @throws JMSException
* @throws NamingException
*/
private static void readInputStream(String aStreamQueueName) throws
JMSException, NamingException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("guest", "guest", "tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE); // CLIENT_ACKNOWLEDGE makes no difference
String exclusiveQueueName = aStreamQueueName; // +
"?consumer.exclusive=true"; // use the queue exclusively makes no difference
byte[] buf = new byte[1024];
OutputStream fileOutputStream = null;
InputStream queueInputStream = null;
try {
fileOutputStream = new BufferedOutputStream(new
FileOutputStream(outFilepath));
Queue destination =
session.createQueue(exclusiveQueueName);
queueInputStream =
((ActiveMQConnection)connection).createInputStream(destination);
long sumSize = 0;
int counter = 0;
for (int readCount = queueInputStream.read(buf);
readCount != -1; readCount = queueInputStream.read(buf)) {
sumSize += readCount;
counter++;
logger.log(Level.INFO, "read [{0}] from input
stream (bytes so far {1})", new Object[] {counter, sumSize});
fileOutputStream.write(buf, 0, readCount);
}
} finally {
if (fileOutputStream != null) {
fileOutputStream.close();
}
if (queueInputStream != null) {
queueInputStream.close();
}
}
session.close();
connection.close();
}
}
{code}
{code:title=The Producer|borderStyle=solid}
package test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* Test class: test producer for ActiveMQOutputStream.
*/
public class AMQStreamProducer {
private static final Logger logger =
Logger.getLogger(AMQStreamProducer.class.getName());
/**
* @param args the command line arguments
*/
public static void main(String[] args) throws IOException {
try {
sendOutputStream("AMQStreamTest");
logger.info("outputstream sent");
System.exit(0);
} catch (JMSException ex) {
Logger.getLogger(AMQStreamProducer.class.getName()).log(Level.SEVERE, null, ex);
} catch (NamingException ex) {
Logger.getLogger(AMQStreamProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
/**
* Sends an OutputStream with 0 bytes.
*/
protected static void sendOutputStream(String aQueueName) throws
IOException, JMSException, NamingException {
Connection connection;
Session session = null;
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616");
connection = connectionFactory.createConnection();
session = connection.createSession(false,
session.AUTO_ACKNOWLEDGE);
Destination streamDestination = session.createQueue(aQueueName);
Map messageProperties = new HashMap<String, String>();
messageProperties.put("myProperty", "TEST");
OutputStream out =
((ActiveMQConnection)connection).createOutputStream(streamDestination,
messageProperties, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY,
Message.DEFAULT_TIME_TO_LIVE);
for(int i=0; i< 4*1024*1024; i++) {
out.write(0);
}
out.close();
session.close();
connection.close();
}
}
{code}
> ActiveMQInputStream does not work for big content
> -------------------------------------------------
>
> Key: AMQ-4802
> URL: https://issues.apache.org/jira/browse/AMQ-4802
> Project: ActiveMQ
> Issue Type: Bug
> Components: JMS client
> Affects Versions: 5.8.0
> Environment: Mac OS 10.6.8
> Reporter: Marc Leineweber
>
> Sending a file with size 4.6 MB with ActiveMQOutputStream works and results
> in 71 messages on the ActiveMQ broker.
> Trying to receive this file / these messages using an ActiveMQInputStream
> fails - after a while (720896 bytes received so far) inputStream.read(buffer)
> never answers (it waits in ...). If inputStream is created with a timeout, I
> get a ActiveMQInputStream$ReadTimeoutEcxeption.
--
This message was sent by Atlassian JIRA
(v6.1#6144)