Author: gtully
Date: Mon Oct 11 19:06:02 2010
New Revision: 1021466
URL: http://svn.apache.org/viewvc?rev=1021466&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2103 and
https://issues.apache.org/activemq/browse/AMQ-2966, implement fix for 2103
using boolean policy for queues named reduceMemoryFootprint, when set, after a
message is persisted, the internal state is cleared. This works at a natural
sync point in the broker and avoids contention. The contention to marshall with
the original patch results in AMQ-2966
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Mon Oct 11 19:06:02 2010
@@ -86,6 +86,7 @@ public abstract class BaseDestination im
private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
private boolean gcIfInactive;
private long lastActiveTime=0l;
+ private boolean reduceMemoryFootprint = false;
/**
* @param broker
@@ -662,5 +663,12 @@ public abstract class BaseDestination im
}
return result;
}
-
+
+ public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
+ this.reduceMemoryFootprint = reduceMemoryFootprint;
+ }
+
+ protected boolean isReduceMemoryFootprint() {
+ return this.reduceMemoryFootprint;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Oct 11 19:06:02 2010
@@ -668,6 +668,9 @@ public class Queue extends BaseDestinati
if (store != null && message.isPersistent()) {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
result = store.asyncAddQueueMessage(context, message);
+ if (isReduceMemoryFootprint()) {
+ message.clearMarshalledState();
+ }
}
if (context.isInTransaction()) {
// If this is a transacted message.. increase the usage now so
that
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Oct 11 19:06:02 2010
@@ -91,6 +91,7 @@ public class PolicyEntry extends Destina
private boolean allConsumersExclusiveByDefault;
private boolean gcInactiveDestinations;
private long inactiveTimoutBeforeGC =
BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
+ private boolean reduceMemoryFootprint;
public void configure(Broker broker,Queue queue) {
@@ -163,6 +164,7 @@ public class PolicyEntry extends Destina
destination.setPrioritizedMessages(isPrioritizedMessages());
destination.setGcIfInactive(isGcInactiveDestinations());
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
+ destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
}
public void configure(Broker broker, SystemUsage memoryManager,
TopicSubscription subscription) {
@@ -780,5 +782,12 @@ public class PolicyEntry extends Destina
public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
}
+
+ public boolean isReduceMemoryFootprint() {
+ return reduceMemoryFootprint;
+ }
+ public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
+ this.reduceMemoryFootprint = reduceMemoryFootprint;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
Mon Oct 11 19:06:02 2010
@@ -118,6 +118,11 @@ public class ActiveMQMapMessage extends
storeContent();
}
+ public void clearMarshalledState() throws JMSException {
+ super.clearMarshalledState();
+ map.clear();
+ }
+
private void storeContent() {
try {
if (getContent() == null && !map.isEmpty()) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
Mon Oct 11 19:06:02 2010
@@ -201,6 +201,11 @@ public class ActiveMQObjectMessage exten
storeContent();
}
+ public void clearMarshalledState() throws JMSException {
+ super.clearMarshalledState();
+ this.object = null;
+ }
+
public void onMessageRolledBack() {
super.onMessageRolledBack();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
Mon Oct 11 19:06:02 2010
@@ -121,14 +121,13 @@ public class ActiveMQTextMessage extends
}
}
- @Override
- public void afterMarshall(WireFormat wireFormat) throws IOException {
- super.afterMarshall(wireFormat);
- //see https://issues.apache.org/activemq/browse/AMQ-2103
- // and https://issues.apache.org/activemq/browse/AMQ-2966
- this.text=null;
+ // see https://issues.apache.org/activemq/browse/AMQ-2103
+ // and https://issues.apache.org/activemq/browse/AMQ-2966
+ public void clearMarshalledState() throws JMSException {
+ super.clearMarshalledState();
+ this.text = null;
}
-
+
/**
* Clears out the message body. Clearing a message's body does not clear
its
* header values or property entries. <p/>
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
Mon Oct 11 19:06:02 2010
@@ -94,6 +94,11 @@ public abstract class Message extends Ba
public abstract Message copy();
public abstract void clearBody() throws JMSException;
+ // useful to reduce the memory footprint of a persisted message
+ public void clearMarshalledState() throws JMSException {
+ properties = null;
+ }
+
protected void copy(Message copy) {
super.copy(copy);
copy.producerId = producerId;
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java?rev=1021466&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
Mon Oct 11 19:06:02 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerTestSupport;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.usecases.MyObject;
+
+public class AMQ2103Test extends BrokerTestSupport {
+ static PolicyEntry reduceMemoryFootprint = new PolicyEntry();
+ static {
+ reduceMemoryFootprint.setReduceMemoryFootprint(true);
+ }
+
+ public PolicyEntry defaultPolicy = reduceMemoryFootprint;
+
+ @Override
+ protected PolicyEntry getDefaultPolicy() {
+ return defaultPolicy;
+ }
+
+ public void initCombosForTestVerifyMarshalledStateIsCleared() throws
Exception {
+ addCombinationValues("defaultPolicy", new Object[]{defaultPolicy,
null});
+ }
+
+ public static Test suite() {
+ return suite(AMQ2103Test.class);
+ }
+
+ /**
+ * use mem persistence so no marshaling,
+ * reduceMemoryFootprint on/off that will reduce memory by whacking the
marshaled state
+ * With vm transport and deferred serialisation and no persistence (mem
persistence),
+ * we see the message as sent by the client so we can validate the
contents against
+ * the policy
+ * @throws Exception
+ */
+ public void testVerifyMarshalledStateIsCleared() throws Exception {
+
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://localhost");
+ factory.setOptimizedMessageDispatch(true);
+ factory.setObjectMessageSerializationDefered(true);
+ factory.setCopyMessageOnSend(false);
+
+ Connection connection = factory.createConnection();
+ Session session = (ActiveMQSession)connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination destination = new ActiveMQQueue("testQ");
+ MessageConsumer consumer = session.createConsumer(destination);
+ connection.start();
+
+ MessageProducer producer = session.createProducer(destination);
+ final MyObject obj = new MyObject("A message");
+ ActiveMQObjectMessage m1 =
(ActiveMQObjectMessage)session.createObjectMessage();
+ m1.setObject(obj);
+ producer.send(m1);
+
+ ActiveMQTextMessage m2 = new ActiveMQTextMessage();
+ m2.setText("Test Message Payload.");
+ producer.send(m2);
+
+ ActiveMQMapMessage m3 = new ActiveMQMapMessage();
+ m3.setString("text", "my message");
+ producer.send(m3);
+
+ Message m = consumer.receive(maxWait);
+ assertNotNull(m);
+ assertEquals(m1.getMessageId().toString(), m.getJMSMessageID());
+ assertTrue(m instanceof ActiveMQObjectMessage);
+
+ if (getDefaultPolicy() != null) {
+ assertNull("object data cleared by reduceMemoryFootprint (and
never marshalled as using mem persistence)",
+ ((ActiveMQObjectMessage)m).getObject());
+ }
+
+ // verify no serialisation via vm transport
+ assertEquals("writeObject called", 0, obj.getWriteObjectCalled());
+ assertEquals("readObject called", 0, obj.getReadObjectCalled());
+ assertEquals("readObjectNoData called", 0,
obj.getReadObjectNoDataCalled());
+
+ m = consumer.receive(maxWait);
+ assertNotNull(m);
+ assertEquals(m2.getMessageId().toString(), m.getJMSMessageID());
+ assertTrue(m instanceof ActiveMQTextMessage);
+
+ if (getDefaultPolicy() != null) {
+ assertNull("text cleared by reduceMemoryFootprint (and never
marshalled as using mem persistence)",
+ ((ActiveMQTextMessage)m).getText());
+ }
+
+ m = consumer.receive(maxWait);
+ assertNotNull(m);
+ assertEquals(m3.getMessageId().toString(), m.getJMSMessageID());
+ assertTrue(m instanceof ActiveMQMapMessage);
+
+ if (getDefaultPolicy() != null) {
+ assertNull("text cleared by reduceMemoryFootprint (and never
marshalled as using mem persistence)",
+ ((ActiveMQMapMessage)m).getStringProperty("text"));
+ }
+
+ connection.close();
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date