Author: robbie
Date: Wed Jun  9 09:46:11 2010
New Revision: 952930

URL: http://svn.apache.org/viewvc?rev=952930&view=rev
Log:
QPID-2650: Make use of connections with auto-commit transactions disabled for 
metadata, content, and queue entries.

Additionally, make remaining uses of auto-commit enabled connections more 
visible and remove the erroneous explicit commits on these.
Close completed Statements after use. Add/correct various related debug log 
statements.
Stop adding vhost name to all environment paths, now just the default value.

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=952930&r1=952929&r2=952930&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
 Wed Jun  9 09:46:11 2010
@@ -218,9 +218,8 @@ public class DerbyMessageStore implement
     {
         initialiseDriver();
 
-        //Update to pick up QPID_WORK and use that as the default location not 
just derbyDB
-
-        final String databasePath = 
storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, 
System.getProperty("QPID_WORK")+"/derbyDB");
+        final String databasePath = 
storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, 
+                                            
System.getProperty("QPID_WORK")+"/derbyDB/" + name);
 
         File environmentPath = new File(databasePath);
         if (!environmentPath.exists())
@@ -234,7 +233,7 @@ public class DerbyMessageStore implement
 
         CurrentActor.get().message(_logSubject, 
MessageStoreMessages.MST_STORE_LOCATION(environmentPath.getAbsolutePath()));
 
-        createOrOpenDatabase(name, databasePath);
+        createOrOpenDatabase(databasePath);
     }
 
     private static synchronized void initialiseDriver() throws 
ClassNotFoundException
@@ -245,12 +244,11 @@ public class DerbyMessageStore implement
         }
     }
 
-    private void createOrOpenDatabase(String name, final String 
environmentPath) throws SQLException
+    private void createOrOpenDatabase(final String environmentPath) throws 
SQLException
     {
-        //fixme this the _vhost name should not be added here.
-        _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + 
";create=true";
+        _connectionURL = "jdbc:derby:" + environmentPath + ";create=true";
 
-        Connection conn = newConnection();
+        Connection conn = newAutoCommitConnection();
 
         createVersionTable(conn);
         createExchangeTable(conn);
@@ -394,8 +392,7 @@ public class DerbyMessageStore implement
 
     private List<String> 
loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws 
SQLException, AMQException
     {
-        Connection conn = newConnection();
-
+        Connection conn = newAutoCommitConnection();
 
         Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
@@ -425,6 +422,9 @@ public class DerbyMessageStore implement
 
             queues.add(queueName);
         }
+        
+        conn.close();
+        
         return queues;
     }
 
@@ -436,8 +436,7 @@ public class DerbyMessageStore implement
         Connection conn = null;
         try
         {
-            conn = newConnection();
-
+            conn = newAutoCommitConnection();
 
             Statement stmt = conn.createStatement();
             ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
@@ -468,16 +467,12 @@ public class DerbyMessageStore implement
 
     private void 
recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, 
List<String> exchanges) throws AMQException, SQLException
     {
-
-
         _logger.info("Recovering bindings...");
 
-
-
         Connection conn = null;
         try
         {
-            conn = newConnection();
+            conn = newAutoCommitConnection();
 
             PreparedStatement stmt = 
conn.prepareStatement(SELECT_FROM_BINDINGS);
 
@@ -504,6 +499,8 @@ public class DerbyMessageStore implement
 
                 brh.binding(exchangeName, queueName, bindingKey, buf);
             }
+            
+            stmt.close();
         }
         finally
         {
@@ -544,20 +541,16 @@ public class DerbyMessageStore implement
         Connection conn = null;
         try
         {
-
-
             conn = newConnection();
             PreparedStatement stmt = 
conn.prepareStatement(DELETE_FROM_META_DATA);
             stmt.setLong(1,messageId);
             int results = stmt.executeUpdate();
-
+            stmt.close();
+            
             if (results == 0)
             {
-
-
                 throw new RuntimeException("Message metadata not found for 
message id " + messageId);
             }
-            stmt.close();
 
             if (_logger.isDebugEnabled())
             {
@@ -567,8 +560,7 @@ public class DerbyMessageStore implement
             stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
             stmt.setLong(1,messageId);
             results = stmt.executeUpdate();
-
-
+            stmt.close();
 
             conn.commit();
             conn.close();
@@ -588,7 +580,7 @@ public class DerbyMessageStore implement
                 }
             }
 
-            throw new RuntimeException("Error removing Message with id " + 
messageId + " to database: " + e, e);
+            throw new RuntimeException("Error removing message with id " + 
messageId + " from database: " + e, e);
         }
 
     }
@@ -603,13 +595,12 @@ public class DerbyMessageStore implement
 
                 try
                 {
-                    conn = newConnection();
+                    conn = newAutoCommitConnection();
 
                     PreparedStatement stmt = 
conn.prepareStatement(FIND_EXCHANGE);
                     stmt.setString(1, 
exchange.getNameShortString().toString());
                     stmt.execute();
                     stmt.close();
-                    conn.commit();
 
                     ResultSet rs = stmt.executeQuery();
 
@@ -622,7 +613,6 @@ public class DerbyMessageStore implement
                         stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : 
(short) 0);
                         stmt.execute();
                         stmt.close();
-                        conn.commit();
                     }
 
                 }
@@ -636,7 +626,7 @@ public class DerbyMessageStore implement
             }
             catch (SQLException e)
             {
-                throw new AMQException("Error writing Exchange with name " + 
exchange.getNameShortString() + " to database: " + e, e);
+                throw new AMQException("Error adding Exchange with name " + 
exchange.getNameShortString() + " to database: " + e, e);
             }
         }
 
@@ -648,23 +638,19 @@ public class DerbyMessageStore implement
 
         try
         {
-            conn = newConnection();
+            conn = newAutoCommitConnection();
             PreparedStatement stmt = 
conn.prepareStatement(DELETE_FROM_EXCHANGE);
             stmt.setString(1, exchange.getNameShortString().toString());
             int results = stmt.executeUpdate();
+            stmt.close();
             if(results == 0)
             {
                 throw new AMQException("Exchange " + 
exchange.getNameShortString() + " not found");
             }
-            else
-            {
-                conn.commit();
-                stmt.close();
-            }
         }
         catch (SQLException e)
         {
-            throw new AMQException("Error writing deleting with name " + 
exchange.getNameShortString() + " from database: " + e, e);
+            throw new AMQException("Error deleting exchange with name " + 
exchange.getNameShortString() + " from database: " + e, e);
         }
         finally
         {
@@ -690,10 +676,9 @@ public class DerbyMessageStore implement
         {
             Connection conn = null;
 
-
             try
             {
-                conn = newConnection();
+                conn = newAutoCommitConnection();
 
                 PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
                 stmt.setString(1, exchange.getNameShortString().toString() );
@@ -726,7 +711,6 @@ public class DerbyMessageStore implement
                     }
 
                     stmt.executeUpdate();
-                    conn.commit();
                     stmt.close();
                 }
             }
@@ -761,24 +745,23 @@ public class DerbyMessageStore implement
     {
         Connection conn = null;
 
-
         try
         {
-            conn = newConnection();
+            conn = newAutoCommitConnection();
             // exchange_name varchar(255) not null, queue_name varchar(255) 
not null, binding_key varchar(255), arguments blob
             PreparedStatement stmt = 
conn.prepareStatement(DELETE_FROM_BINDINGS);
             stmt.setString(1, exchange.getNameShortString().toString() );
             stmt.setString(2, queue.getNameShortString().toString());
             stmt.setString(3, routingKey == null ? null : 
routingKey.toString());
-
-
-            if(stmt.executeUpdate() != 1)
+            
+            int result = stmt.executeUpdate();
+            stmt.close();
+            
+            if(result != 1)
             {
                  throw new AMQException("Queue binding for queue with name " + 
queue.getNameShortString() + " to exchange "
                 + exchange.getNameShortString() + "  not found");
             }
-            conn.commit();
-            stmt.close();
         }
         catch (SQLException e)
         {
@@ -817,7 +800,7 @@ public class DerbyMessageStore implement
         {
             try
             {
-                Connection conn = newConnection();
+                Connection conn = newAutoCommitConnection();
 
                 PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
                 stmt.setString(1, queue.getNameShortString().toString());
@@ -849,11 +832,8 @@ public class DerbyMessageStore implement
                     stmt.setBinaryStream(4,bis,underlying.length);
                     
                     stmt.execute();
-
                     stmt.close();
 
-                    conn.commit();
-
                     conn.close();
                 }
             }
@@ -864,9 +844,27 @@ public class DerbyMessageStore implement
         }
     }
 
+    /**
+     * Convenience method to create a new Connection configured for 
TRANSACTION_READ_COMMITED
+     * isolation and with auto-commit transactions enabled.
+     */
+    private Connection newAutoCommitConnection() throws SQLException
+    {
+        final Connection connection = newConnection();
+        connection.setAutoCommit(true);
+        
+        return connection;
+    }
+
+    /**
+     * Convenience method to create a new Connection configured for 
TRANSACTION_READ_COMMITED
+     * isolation and with auto-commit transactions disabled.
+     */
     private Connection newConnection() throws SQLException
     {
         final Connection connection = 
DriverManager.getConnection(_connectionURL);
+        connection.setAutoCommit(false);
+        
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
         return connection;
     }
 
@@ -876,22 +874,18 @@ public class DerbyMessageStore implement
         _logger.debug("public void removeQueue(AMQShortString name = " + name 
+ "): called");
         Connection conn = null;
 
-
         try
         {
-            conn = newConnection();
+            conn = newAutoCommitConnection();
             PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
             stmt.setString(1, name.toString());
             int results = stmt.executeUpdate();
-
-
+            stmt.close();
+            
             if (results == 0)
             {
                 throw new AMQException("Queue " + name + " not found");
             }
-
-            conn.commit();
-            stmt.close();
         }
         catch (SQLException e)
         {
@@ -930,16 +924,16 @@ public class DerbyMessageStore implement
 
         try
         {
-            PreparedStatement stmt = 
conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
-            stmt.setString(1,name);
-            stmt.setLong(2,messageId);
-            stmt.executeUpdate();
-            connWrapper.requiresCommit();
-
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Enqueuing message " + messageId + " on queue " 
+ name + "[Connection" + conn + "]");
             }
+            
+            PreparedStatement stmt = 
conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
+            stmt.setString(1,name);
+            stmt.setLong(2,messageId);
+            stmt.executeUpdate();
+            stmt.close();
         }
         catch (SQLException e)
         {
@@ -964,8 +958,7 @@ public class DerbyMessageStore implement
             stmt.setString(1,name);
             stmt.setLong(2,messageId);
             int results = stmt.executeUpdate();
-
-            connWrapper.requiresCommit();
+            stmt.close();
 
             if(results != 1)
             {
@@ -989,23 +982,12 @@ public class DerbyMessageStore implement
     private static final class ConnectionWrapper
     {
         private final Connection _connection;
-        private boolean _requiresCommit;
 
         public ConnectionWrapper(Connection conn)
         {
             _connection = conn;
         }
 
-        public void setRequiresCommit()
-        {
-            _requiresCommit = true;
-        }
-
-        public boolean requiresCommit()
-        {
-            return _requiresCommit;
-        }
-
         public Connection getConnection()
         {
             return _connection;
@@ -1071,11 +1053,7 @@ public class DerbyMessageStore implement
         try
         {
             Connection conn = connWrapper.getConnection();
-            if(connWrapper.requiresCommit())
-            {
-                conn.rollback();
-            }
-
+            conn.rollback();
             conn.close();
         }
         catch (SQLException e)
@@ -1094,6 +1072,11 @@ public class DerbyMessageStore implement
     private void storeMetaData(Connection conn, long messageId, 
StorableMessageMetaData metaData)
         throws SQLException
     {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Adding metadata for message " +messageId);
+        }
+        
         PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
         stmt.setLong(1,messageId);
 
@@ -1107,8 +1090,13 @@ public class DerbyMessageStore implement
         metaData.writeToBuffer(0, buf);
         ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
         stmt.setBinaryStream(2,bis,underlying.length);
-        stmt.executeUpdate();
-
+        int result = stmt.executeUpdate();
+        stmt.close();
+        
+        if(result == 0)
+        {
+            throw new RuntimeException("Unable to add meta data for message " 
+messageId);
+        }
     }
 
 
@@ -1116,7 +1104,7 @@ public class DerbyMessageStore implement
 
     private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) 
throws SQLException
     {
-        Connection conn = newConnection();
+        Connection conn = newAutoCommitConnection();
 
         MessageStoreRecoveryHandler.StoredMessageRecoveryHandler 
messageHandler = recoveryHandler.begin();
 
@@ -1144,8 +1132,6 @@ public class DerbyMessageStore implement
             StorableMessageMetaData metaData = 
type.getFactory().createMetaData(buf);
             StoredDerbyMessage message = new StoredDerbyMessage(messageId, 
metaData, false);
             messageHandler.message(message);
-
-
         }
 
         _messageId.set(maxId);
@@ -1157,14 +1143,13 @@ public class DerbyMessageStore implement
 
     private void recoverQueueEntries(TransactionLogRecoveryHandler 
recoveryHandler) throws SQLException
     {
-        Connection conn = newConnection();
+        Connection conn = newAutoCommitConnection();
 
         TransactionLogRecoveryHandler.QueueEntryRecoveryHandler 
queueEntryHandler = recoveryHandler.begin(this);
 
         Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
 
-
         while(rs.next())
         {
 
@@ -1172,17 +1157,16 @@ public class DerbyMessageStore implement
             long messageId = rs.getLong(2);
             queueEntryHandler.queueEntry(queueName,messageId);
         }
-
-
+        
+        stmt.close();
 
         queueEntryHandler.completeQueueEntryRecovery();
-
     }
 
     StorableMessageMetaData getMetaData(long messageId) throws SQLException
     {
 
-        Connection conn = newConnection();
+        Connection conn = newAutoCommitConnection();
         try
         {
             PreparedStatement stmt = 
conn.prepareStatement(SELECT_FROM_META_DATA);
@@ -1191,7 +1175,8 @@ public class DerbyMessageStore implement
 
             if(rs.next())
             {
-
+                stmt.close();
+                
                 Blob dataAsBlob = rs.getBlob(1);
 
                 byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) 
dataAsBlob.length());
@@ -1205,6 +1190,8 @@ public class DerbyMessageStore implement
             }
             else
             {
+                stmt.close();
+                
                 throw new RuntimeException("Meta data not found for message 
with id " + messageId);
             }
 
@@ -1218,18 +1205,13 @@ public class DerbyMessageStore implement
 
     private void addContent(Connection conn, long messageId, int offset, 
ByteBuffer src)
     {
-
-
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Adding content chunk offset " + offset + " for 
message " +messageId);
+        }
 
         try
         {
-            final boolean newConnection = conn == null;
-
-            if(newConnection)
-            {
-                conn = newConnection();
-            }
-
             src = src.slice();
 
             byte[] chunkData = new byte[src.limit()];
@@ -1249,12 +1231,7 @@ public class DerbyMessageStore implement
             ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
             stmt.setBinaryStream(4, bis, chunkData.length);
             stmt.executeUpdate();
-
-            if(newConnection)
-            {
-                conn.commit();
-                conn.close();
-            }
+            stmt.close();
         }
         catch (SQLException e)
         {
@@ -1270,10 +1247,9 @@ public class DerbyMessageStore implement
                 }
             }
 
-            throw new RuntimeException("Error reading AMQMessage with id " + 
messageId + " from database: " + e, e);
+            throw new RuntimeException("Error adding content chunk offset " + 
offset + " for message " + messageId + ": " + e, e);
         }
 
-
     }
 
 
@@ -1284,7 +1260,7 @@ public class DerbyMessageStore implement
 
         try
         {
-            conn = newConnection();
+            conn = newAutoCommitConnection();
 
             PreparedStatement stmt = 
conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
             stmt.setLong(1,messageId);
@@ -1317,6 +1293,7 @@ public class DerbyMessageStore implement
                 }
             }
 
+            stmt.close();
             conn.close();
             return written;
 
@@ -1335,7 +1312,7 @@ public class DerbyMessageStore implement
                 }
             }
 
-            throw new RuntimeException("Error reading AMQMessage with id " + 
messageId + " from database: " + e, e);
+            throw new RuntimeException("Error retrieving content from offset " 
+ offset + " for message " + messageId + ": " + e, e);
         }
 
 
@@ -1478,12 +1455,21 @@ public class DerbyMessageStore implement
             {
                 if(_conn != null)
                 {
+                    if(_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Flushing message " + _messageId + " to 
store");
+                    }
+                    
                     _conn.commit();
                     _conn.close();
                 }
             }
             catch (SQLException e)
             {
+                if(_logger.isDebugEnabled())
+                {
+                    _logger.debug("Error when trying to flush message " + 
_messageId + " to store: " + e);
+                }
                 throw new RuntimeException(e);
             }
             finally

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=952930&r1=952929&r2=952930&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
 Wed Jun  9 09:46:11 2010
@@ -167,9 +167,11 @@ public class VirtualHostConfigRecoveryHa
                 serverMessage = new MessageTransferMessage(message, null);
                 break;
             default:
-                throw new RuntimeException("Unknown message type retreived 
from store " + message.getMetaData().getClass());
+                throw new RuntimeException("Unknown message type retrieved 
from store " + message.getMetaData().getClass());
         }
 
+        //_logger.debug("Recovered message with id " + serverMessage);
+        
 
         _recoveredMessages.put(message.getMessageNumber(), serverMessage);
         _unusedMessages.put(message.getMessageNumber(), message);
@@ -222,7 +224,7 @@ public class VirtualHostConfigRecoveryHa
             AMQQueue queue = queueRegistry.getQueue(new 
AMQShortString(queueName));
             if (queue == null)
             {
-                _logger.error("Unkown queue: " + queueName + " cannot be bound 
to exchange: "
+                _logger.error("Unknown queue: " + queueName + " cannot be 
bound to exchange: "
                     + exchange.getNameShortString());
             }
             else
@@ -302,7 +304,7 @@ public class VirtualHostConfigRecoveryHa
                 }
                 else
                 {
-                    _logger.warn("Message id " + messageId + " referenced in 
log as enqueue in queue " + queue.getNameShortString() + " is unknwon, entry 
will be discarded");
+                    _logger.warn("Message id " + messageId + " referenced in 
log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry 
will be discarded");
                     TransactionLog.Transaction txn = 
_transactionLog.newTransaction();
                     txn.dequeueMessage(queue, messageId);
                     txn.commitTranAsync();

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java?rev=952930&r1=952929&r2=952930&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
 Wed Jun  9 09:46:11 2010
@@ -70,13 +70,13 @@ public class PersistentStoreTest extends
         {
             Message msg = _consumer.receive(RECEIVE_TIMEOUT);
             assertNotNull("Message " + i + " not received", msg);
-            assertEquals("Did not recieve the expected message", i, 
msg.getIntProperty(INDEX));
+            assertEquals("Did not receive the expected message", i, 
msg.getIntProperty(INDEX));
         }
         
         Message msg = _consumer.receive(100);
         if(msg != null)
         {
-            fail("No more messages should be received, but received message: " 
+ msg.getIntProperty(INDEX));
+            fail("No more messages should be received, but received additional 
message with index: " + msg.getIntProperty(INDEX));
         }
     }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to