Author: tabish
Date: Wed Oct 20 15:48:56 2010
New Revision: 1025620
URL: http://svn.apache.org/viewvc?rev=1025620&view=rev
Log:
apply patch (with modification to fix a null pointer violation case) fixes:
https://issues.apache.org/activemq/browse/AMQ-2990
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java?rev=1025620&r1=1025619&r2=1025620&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQOutputStream.java
Wed Oct 20 15:48:56 2010
@@ -38,10 +38,9 @@ import org.apache.activemq.util.IOExcept
*/
public class ActiveMQOutputStream extends OutputStream implements Disposable {
- // Send down 64k messages.
protected int count;
- final byte buffer[] = new byte[64 * 1024];
+ final byte buffer[];
private final ActiveMQConnection connection;
private final Map<String, Object> properties;
@@ -53,6 +52,11 @@ public class ActiveMQOutputStream extend
private final int priority;
private final long timeToLive;
+ /**
+ * JMS Property which is used to specify the size (in kb) which is used as
chunk size when splitting the stream. Default is 64kb
+ */
+ public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
+
public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId
producerId, ActiveMQDestination destination, Map<String, Object> properties,
int deliveryMode, int priority,
long timeToLive) throws JMSException {
this.connection = connection;
@@ -61,6 +65,19 @@ public class ActiveMQOutputStream extend
this.timeToLive = timeToLive;
this.properties = properties == null ? null : new HashMap<String,
Object>(properties);
+ Integer chunkSize = this.properties == null ? null : (Integer)
this.properties.get(AMQ_STREAM_CHUNK_SIZE);
+ if (chunkSize == null) {
+ chunkSize = 64 * 1024;
+ } else {
+ if (chunkSize < 1) {
+ throw new IllegalArgumentException("Chunk size must be greater
then 0");
+ } else {
+ chunkSize *= 1024;
+ }
+ }
+
+ buffer = new byte[chunkSize];
+
if (destination == null) {
throw new InvalidDestinationException("Don't understand null
destinations");
}