Author: gtully
Date: Mon Oct 11 19:06:02 2010
New Revision: 1021466

URL: http://svn.apache.org/viewvc?rev=1021466&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2103 and 
https://issues.apache.org/activemq/browse/AMQ-2966, implement fix for 2103 
using boolean policy for queues named reduceMemoryFootprint, when set, after a 
message is persisted, the internal state is cleared. This works at a natural 
sync point in the broker and avoids contention. The contention to marshall with 
the original patch results in AMQ-2966

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 Mon Oct 11 19:06:02 2010
@@ -86,6 +86,7 @@ public abstract class BaseDestination im
     private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
     private boolean gcIfInactive;
     private long lastActiveTime=0l;
+    private boolean reduceMemoryFootprint = false;
 
     /**
      * @param broker
@@ -662,5 +663,12 @@ public abstract class BaseDestination im
         }
         return result;
     }
-    
+
+    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
+        this.reduceMemoryFootprint = reduceMemoryFootprint;
+    }
+
+    protected boolean isReduceMemoryFootprint() {
+        return this.reduceMemoryFootprint;
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Mon Oct 11 19:06:02 2010
@@ -668,6 +668,9 @@ public class Queue extends BaseDestinati
             if (store != null && message.isPersistent()) {        
                 
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                 result = store.asyncAddQueueMessage(context, message);
+                if (isReduceMemoryFootprint()) {
+                    message.clearMarshalledState();
+                }
             }
             if (context.isInTransaction()) {
                 // If this is a transacted message.. increase the usage now so 
that

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
 Mon Oct 11 19:06:02 2010
@@ -91,6 +91,7 @@ public class PolicyEntry extends Destina
     private boolean allConsumersExclusiveByDefault;
     private boolean gcInactiveDestinations;
     private long inactiveTimoutBeforeGC = 
BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
+    private boolean reduceMemoryFootprint;
     
    
     public void configure(Broker broker,Queue queue) {
@@ -163,6 +164,7 @@ public class PolicyEntry extends Destina
         destination.setPrioritizedMessages(isPrioritizedMessages());
         destination.setGcIfInactive(isGcInactiveDestinations());
         destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
+        destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, 
TopicSubscription subscription) {
@@ -780,5 +782,12 @@ public class PolicyEntry extends Destina
     public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
         this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
     }
+    
+    public boolean isReduceMemoryFootprint() {
+        return reduceMemoryFootprint;
+    }
 
+    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
+        this.reduceMemoryFootprint = reduceMemoryFootprint;
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
 Mon Oct 11 19:06:02 2010
@@ -118,6 +118,11 @@ public class ActiveMQMapMessage extends 
         storeContent();
     }
 
+    public void clearMarshalledState() throws JMSException {
+        super.clearMarshalledState();
+        map.clear();
+    }
+
     private void storeContent() {
         try {
             if (getContent() == null && !map.isEmpty()) {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
 Mon Oct 11 19:06:02 2010
@@ -201,6 +201,11 @@ public class ActiveMQObjectMessage exten
         storeContent();
     }
 
+    public void clearMarshalledState() throws JMSException {
+        super.clearMarshalledState();
+        this.object = null;
+    }
+
     public void onMessageRolledBack() {
         super.onMessageRolledBack();
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
 Mon Oct 11 19:06:02 2010
@@ -121,14 +121,13 @@ public class ActiveMQTextMessage extends
         }
     }
 
-    @Override
-    public void afterMarshall(WireFormat wireFormat) throws IOException {
-        super.afterMarshall(wireFormat);
-        //see https://issues.apache.org/activemq/browse/AMQ-2103
-        // and https://issues.apache.org/activemq/browse/AMQ-2966
-        this.text=null;
+    // see https://issues.apache.org/activemq/browse/AMQ-2103
+    // and https://issues.apache.org/activemq/browse/AMQ-2966
+    public void clearMarshalledState() throws JMSException {
+        super.clearMarshalledState();
+        this.text = null;
     }
-    
+
     /**
      * Clears out the message body. Clearing a message's body does not clear 
its
      * header values or property entries. <p/>

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
 Mon Oct 11 19:06:02 2010
@@ -94,6 +94,11 @@ public abstract class Message extends Ba
     public abstract Message copy();
     public abstract void clearBody() throws JMSException;
 
+    // useful to reduce the memory footprint of a persisted message
+    public void clearMarshalledState() throws JMSException {
+        properties = null;
+    }
+
     protected void copy(Message copy) {
         super.copy(copy);
         copy.producerId = producerId;

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java?rev=1021466&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
 Mon Oct 11 19:06:02 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerTestSupport;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.usecases.MyObject;
+
+public class AMQ2103Test extends BrokerTestSupport {
+    static PolicyEntry reduceMemoryFootprint = new PolicyEntry();
+    static {
+        reduceMemoryFootprint.setReduceMemoryFootprint(true);
+    }
+
+    public PolicyEntry defaultPolicy = reduceMemoryFootprint;
+
+    @Override
+    protected PolicyEntry getDefaultPolicy() {
+        return defaultPolicy;
+    }
+
+    public void initCombosForTestVerifyMarshalledStateIsCleared() throws 
Exception {
+        addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, 
null});    
+    }
+
+    public static Test suite() {
+        return suite(AMQ2103Test.class);
+    }
+
+    /**
+     * use mem persistence so no marshaling,
+     * reduceMemoryFootprint on/off that will reduce memory by whacking the 
marshaled state
+     * With vm transport and deferred serialisation and no persistence (mem 
persistence),
+     * we see the message as sent by the client so we can validate the 
contents against
+     * the policy
+     * @throws Exception
+     */
+    public void testVerifyMarshalledStateIsCleared() throws Exception {
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost");
+        factory.setOptimizedMessageDispatch(true);
+        factory.setObjectMessageSerializationDefered(true);
+        factory.setCopyMessageOnSend(false);
+
+        Connection connection = factory.createConnection();
+        Session session = (ActiveMQSession)connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        ActiveMQDestination destination = new ActiveMQQueue("testQ");
+               MessageConsumer consumer = session.createConsumer(destination);
+               connection.start();
+
+        MessageProducer producer = session.createProducer(destination);
+        final MyObject obj = new MyObject("A message");
+        ActiveMQObjectMessage m1 = 
(ActiveMQObjectMessage)session.createObjectMessage();
+        m1.setObject(obj);
+        producer.send(m1);
+
+        ActiveMQTextMessage m2 = new ActiveMQTextMessage();
+        m2.setText("Test Message Payload.");
+        producer.send(m2);
+
+        ActiveMQMapMessage m3 = new ActiveMQMapMessage();
+        m3.setString("text", "my message");
+        producer.send(m3);
+
+        Message m = consumer.receive(maxWait);
+        assertNotNull(m);
+        assertEquals(m1.getMessageId().toString(), m.getJMSMessageID());
+        assertTrue(m instanceof ActiveMQObjectMessage);
+
+        if (getDefaultPolicy() != null) {
+            assertNull("object data cleared by reduceMemoryFootprint (and 
never marshalled as using mem persistence)",
+                ((ActiveMQObjectMessage)m).getObject());
+        }
+
+        // verify no serialisation via vm transport
+        assertEquals("writeObject called", 0, obj.getWriteObjectCalled());
+        assertEquals("readObject called", 0, obj.getReadObjectCalled());
+        assertEquals("readObjectNoData called", 0, 
obj.getReadObjectNoDataCalled());
+
+        m = consumer.receive(maxWait);
+        assertNotNull(m);
+        assertEquals(m2.getMessageId().toString(), m.getJMSMessageID());
+        assertTrue(m instanceof ActiveMQTextMessage);
+
+        if (getDefaultPolicy() != null) {
+            assertNull("text cleared by reduceMemoryFootprint (and never 
marshalled as using mem persistence)",
+                ((ActiveMQTextMessage)m).getText());
+        }
+
+        m = consumer.receive(maxWait);
+        assertNotNull(m);
+        assertEquals(m3.getMessageId().toString(), m.getJMSMessageID());
+        assertTrue(m instanceof ActiveMQMapMessage);
+
+        if (getDefaultPolicy() != null) {
+            assertNull("text cleared by reduceMemoryFootprint (and never 
marshalled as using mem persistence)",
+                ((ActiveMQMapMessage)m).getStringProperty("text"));
+        }
+
+        connection.close();
+    }
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to