Author: kwall
Date: Fri May 25 16:23:28 2012
New Revision: 1342712

URL: http://svn.apache.org/viewvc?rev=1342712&view=rev
Log:
QPID-4006: [Java Broker] BDB HA.

Close BDB environment in the event of unexpected exceptions during commit/begin 
tran.
Install BDB exception listener to log exception arising from its own threads.
Default BDB parameters for sensible defaults for standard Qpid use-cases.
Ensure that closeInternal always closes the environment.
Added MBean operation descriptions.

Work of Robbie Gemmell <[email protected]> and myself.

Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 Fri May 25 16:23:28 2012
@@ -30,6 +30,8 @@ import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Environment;
 import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.ExceptionEvent;
+import com.sleepycat.je.ExceptionListener;
 import com.sleepycat.je.LockConflictException;
 import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
@@ -83,6 +85,11 @@ public abstract class AbstractBDBMessage
 
     public static final int VERSION = 6;
 
+    private static final Map<String, String> ENVCONFIG_DEFAULTS = 
Collections.unmodifiableMap(new HashMap<String, String>()
+    {{
+        put(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
+    }});
+
     private Environment _environment;
 
     private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
@@ -168,9 +175,6 @@ public abstract class AbstractBDBMessage
         _configRecoveryHandler = recoveryHandler;
 
         configure(name, storeConfiguration);
-
-
-
     }
 
     public void configureMessageStore(String name,
@@ -200,7 +204,6 @@ public abstract class AbstractBDBMessage
         return new BDBTransaction();
     }
 
-
     /**
      * Called after instantiation in order to configure the message store.
      *
@@ -233,18 +236,21 @@ public abstract class AbstractBDBMessage
 
         _storeLocation = storeLocation;
 
-        _envConfigMap = getConfigMap(storeConfig, "envConfig");
+        _envConfigMap = getConfigMap(ENVCONFIG_DEFAULTS, storeConfig, 
"envConfig");
 
         LOGGER.info("Configuring BDB message store");
 
         setupStore(environmentPath, name);
     }
 
-    protected Map<String,String> getConfigMap(Configuration config, String 
prefix) throws ConfigurationException
+    protected Map<String,String> getConfigMap(Map<String, String> 
defaultConfig, Configuration config, String prefix) throws 
ConfigurationException
     {
         final List<Object> argumentNames = config.getList(prefix + ".name");
         final List<Object> argumentValues = config.getList(prefix + ".value");
-        final Map<String,String> attributes = new 
HashMap<String,String>(argumentNames.size());
+        final int initialSize = argumentNames.size() + defaultConfig.size();
+
+        final Map<String,String> attributes = new 
HashMap<String,String>(initialSize);
+        attributes.putAll(defaultConfig);
 
         for (int i = 0; i < argumentNames.size(); i++)
         {
@@ -390,8 +396,14 @@ public abstract class AbstractBDBMessage
             // Clean the log before closing. This makes sure it doesn't contain
             // redundant data. Closing without doing this means the cleaner 
may not
             // get a chance to finish.
-            _environment.cleanLog();
-            _environment.close();
+            try
+            {
+                _environment.cleanLog();
+            }
+            finally
+            {
+                _environment.close();
+            }
         }
     }
 
@@ -1757,7 +1769,10 @@ public abstract class AbstractBDBMessage
             }
             catch (DatabaseException e)
             {
-                throw new RuntimeException(e);
+                LOGGER.error("Exception during transaction begin, closing 
store environment.", e);
+                closeEnvironmentSafely();
+
+                throw new RuntimeException("Exception during transaction 
begin, store environment closed.", e);
             }
         }
 
@@ -1902,10 +1917,38 @@ public abstract class AbstractBDBMessage
         EnvironmentConfig envConfig = new EnvironmentConfig();
         envConfig.setAllowCreate(true);
         envConfig.setTransactional(true);
-        envConfig.setConfigParam(EnvironmentConfig.LOCK_N_LOCK_TABLES, "7");
-    
+
         setEnvironmentConfigProperties(envConfig);
-    
+
+        envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
+
         return envConfig;
     }
+
+    protected void closeEnvironmentSafely()
+    {
+        try
+        {
+            _environment.close();
+        }
+        catch (DatabaseException ex)
+        {
+            LOGGER.error("Exception closing store environment", ex);
+        }
+        catch (IllegalStateException ex)
+        {
+            LOGGER.error("Exception closing store environment", ex);
+        }
+    }
+
+
+    private class LoggingAsyncExceptionListener implements ExceptionListener
+    {
+        @Override
+        public void exceptionThrown(ExceptionEvent event)
+        {
+            LOGGER.error("Asynchronous exception thrown by BDB thread '"
+                         + event.getThreadName() + "'", event.getException());
+        }
+    }
 }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
 Fri May 25 16:23:28 2012
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.ber
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -75,6 +76,31 @@ public class BDBHAMessageStore extends A
     public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
     public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
 
+    private static final Map<String, String> REPCONFIG_DEFAULTS = 
Collections.unmodifiableMap(new HashMap<String, String>()
+    {{
+        /**
+         * Parameter decreased as the 24h default may lead very large log 
files for most users.
+         */
+        put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h");
+        /**
+         * Parameter increased as the 5 s default may lead to spurious 
timeouts.
+         */
+        put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s");
+        /**
+         * Parameter increased as the 10 s default may lead to spurious 
timeouts.
+         */
+        put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s");
+        /**
+         * Parameter increased as the 10 h default may cause user confusion.
+         */
+        put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
+        /**
+         * Parameter changed from default true so we adopt immediately adopt 
the new behaviour early. False
+         * is scheduled to become default after JE 5.0.48.
+         */
+        put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, 
Boolean.FALSE.toString());
+    }});
+
     private String _groupName;
     private String _nodeName;
     private String _nodeHostPort;
@@ -89,7 +115,7 @@ public class BDBHAMessageStore extends A
     private CommitThreadWrapper _commitThreadWrapper;
     private boolean _localMultiSyncCommits;
     private boolean _autoDesignatedPrimary;
-    private Map<String, String> _repConfigMap;
+    private Map<String, String> _repConfig;
 
     @Override
     public void configure(String name, Configuration storeConfig) throws 
Exception
@@ -116,7 +142,7 @@ public class BDBHAMessageStore extends A
             _localMultiSyncCommits = false;
         }
 
-        _repConfigMap = getConfigMap(storeConfig, "repConfig");
+        _repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, 
"repConfig");
 
         _managedObject = new BDBHAMessageStoreManagerMBean(this);
         _managedObject.register();
@@ -337,7 +363,18 @@ public class BDBHAMessageStore extends A
     {
         // Using commit() instead of commitNoSync() for the HA store to allow
         // the HA durability configuration to influence resulting behaviour.
-        tx.commit();
+        try
+        {
+            tx.commit();
+        }
+        catch (DatabaseException de)
+        {
+            LOGGER.error("Got DatabaseException on commit, closing 
environment", de);
+
+            closeEnvironmentSafely();
+
+            throw de;
+        }
 
         if(_localMultiSyncCommits)
         {
@@ -401,7 +438,7 @@ public class BDBHAMessageStore extends A
 
     private void setReplicationConfigProperties(ReplicationConfig 
replicationConfig)
     {
-        for (Map.Entry<String, String> configItem : _repConfigMap.entrySet())
+        for (Map.Entry<String, String> configItem : _repConfig.entrySet())
         {
             if (LOGGER.isDebugEnabled())
             {

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
 Fri May 25 16:23:28 2012
@@ -83,45 +83,69 @@ public class BDBHAMessageStoreManagerMBe
     }
 
     @Override
-    public String getGroupName() throws IOException
+    public String getGroupName()
     {
         return _store.getGroupName();
     }
 
     @Override
-    public String getNodeName() throws IOException
+    public String getNodeName()
     {
         return _store.getNodeName();
     }
 
     @Override
-    public String getNodeHostPort() throws IOException
+    public String getNodeHostPort()
     {
         return _store.getNodeHostPort();
     }
 
     @Override
-    public String getHelperHostPort() throws IOException
+    public String getHelperHostPort()
     {
         return _store.getHelperHostPort();
     }
 
     @Override
-    public String getReplicationPolicy() throws IOException
+    public String getReplicationPolicy() throws IOException, JMException
     {
-        return _store.getReplicationPolicy();
+        try
+        {
+            return _store.getReplicationPolicy();
+        }
+        catch (RuntimeException e)
+        {
+            LOGGER.debug("Failed query replication policy", e);
+            throw new JMException(e.getMessage());
+        }
     }
 
     @Override
-    public String getNodeState() throws IOException
+    public String getNodeState() throws IOException, JMException
     {
-        return _store.getNodeState();
+        try
+        {
+            return _store.getNodeState();
+        }
+        catch (RuntimeException e)
+        {
+            LOGGER.debug("Failed query node state", e);
+            throw new JMException(e.getMessage());
+        }
     }
 
     @Override
-    public boolean getDesignatedPrimary() throws IOException
+    public boolean getDesignatedPrimary() throws IOException, JMException
     {
-        return _store.isDesignatedPrimary();
+        try
+        {
+            return _store.isDesignatedPrimary();
+        }
+        catch (RuntimeException e)
+        {
+            LOGGER.debug("Failed query designated primary", e);
+            throw new JMException(e.getMessage());
+        }
     }
 
     @Override

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
 Fri May 25 16:23:28 2012
@@ -88,7 +88,18 @@ public class BDBMessageStore extends Abs
     @Override
     protected StoreFuture commit(com.sleepycat.je.Transaction tx, boolean 
syncCommit) throws DatabaseException
     {
-        tx.commitNoSync();
+        try
+        {
+            tx.commitNoSync();
+        }
+        catch(DatabaseException de)
+        {
+            LOGGER.error("Got DatabaseException on commit, closing 
environment", de);
+
+            closeEnvironmentSafely();
+
+            throw de;
+        }
 
         return _commitThreadWrapper.commit(tx, syncCommit);
     }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
 Fri May 25 16:23:28 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.store.berkeleydb;
 
 import java.util.Queue;
@@ -110,7 +130,6 @@ public class CommitThreadWrapper
                 }
                 catch (InterruptedException e)
                 {
-                    //TODO Should we ignore, or throw a 'StoreException'?
                     throw new RuntimeException(e);
                 }
             }
@@ -127,6 +146,8 @@ public class CommitThreadWrapper
      */
     private static class CommitThread extends Thread
     {
+        private static final Logger LOGGER = 
Logger.getLogger(CommitThread.class);
+
         private final AtomicBoolean _stopped = new AtomicBoolean(false);
         private final Queue<BDBCommitFuture> _jobQueue = new 
ConcurrentLinkedQueue<BDBCommitFuture>();
         private final CheckpointConfig _config = new CheckpointConfig();
@@ -188,13 +209,30 @@ public class CommitThreadWrapper
             }
             catch (DatabaseException e)
             {
-                for(int i = 0; i < size; i++)
+                try
                 {
-                    BDBCommitFuture commit = _jobQueue.poll();
-                    commit.abort(e);
+                    LOGGER.error("Exception during environment log flush", e);
+
+                    for(int i = 0; i < size; i++)
+                    {
+                        BDBCommitFuture commit = _jobQueue.poll();
+                        commit.abort(e);
+                    }
                 }
-            }
+                finally
+                {
+                    LOGGER.error("Closing store environment", e);
 
+                    try
+                    {
+                        _environment.close();
+                    }
+                    catch (DatabaseException ex)
+                    {
+                        LOGGER.error("Exception closing store environment", 
ex);
+                    }
+                }
+            }
         }
 
         private boolean hasJobs()

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
 Fri May 25 16:23:28 2012
@@ -25,6 +25,7 @@ import javax.management.JMException;
 import javax.management.openmbean.TabularData;
 
 import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import 
org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
 
 public interface ManagedBDBHAMessageStore
@@ -40,36 +41,38 @@ public interface ManagedBDBHAMessageStor
     public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary";
 
     @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the 
group")
-    String getGroupName() throws IOException;
+    String getGroupName() throws IOException, JMException;
 
     @MBeanAttribute(name=ATTR_NODE_NAME, description="Unique name identifying 
the node within the group")
-    String getNodeName() throws IOException;
+    String getNodeName() throws IOException, JMException;
 
     @MBeanAttribute(name=ATTR_NODE_HOST_PORT, description="Host/port used to 
replicate data between this node and others in the group")
-    String getNodeHostPort() throws IOException;
+    String getNodeHostPort() throws IOException, JMException;
 
     @MBeanAttribute(name=ATTR_NODE_STATE, description="Current state of this 
node")
-    String getNodeState() throws IOException;
+    String getNodeState() throws IOException, JMException;
 
     @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to 
allow a new node to discover other group members")
-    String getHelperHostPort() throws IOException;
+    String getHelperHostPort() throws IOException, JMException;
 
     @MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication 
policy")
-    String getReplicationPolicy() throws IOException;
+    String getReplicationPolicy() throws IOException, JMException;
 
     @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated 
primary flag. Applicable to the two node case.")
-    boolean getDesignatedPrimary() throws IOException;
+    boolean getDesignatedPrimary() throws IOException, JMException;
 
     @MBeanOperation(name="getAllNodesInGroup", description="Get all nodes 
within the group, regardless of whether currently attached or not")
     TabularData getAllNodesInGroup() throws IOException, JMException;
 
     @MBeanOperation(name="removeNodeFromGroup", description="Remove an 
existing node from the group")
-    void removeNodeFromGroup(String nodeName) throws JMException;
+    void removeNodeFromGroup(@MBeanOperationParameter(name="nodeName", 
description="name of node")String nodeName) throws JMException;
 
     @MBeanOperation(name="setDesignatedPrimary", description="Set/unset this 
node as the designated primary for the group. Applicable to the two node case.")
-    void setDesignatedPrimary(boolean primary) throws JMException;
+    void setDesignatedPrimary(@MBeanOperationParameter(name="primary", 
description="designated primary")boolean primary) throws JMException;
 
     @MBeanOperation(name="updateAddress", description="Update the address of 
another node. The node must be in a STOPPED state.")
-    void updateAddress(String nodeName, String newHostName, int newPort) 
throws JMException;
+    void updateAddress(@MBeanOperationParameter(name="nodeName", 
description="name of node")String nodeName,
+                       @MBeanOperationParameter(name="newHostName", 
description="new hostname")String newHostName,
+                       @MBeanOperationParameter(name="newPort", 
description="new port number")int newPort) throws JMException;
 }
 

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
 Fri May 25 16:23:28 2012
@@ -32,6 +32,8 @@ import org.apache.qpid.jms.ConnectionLis
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
+import com.sleepycat.je.rep.ReplicationConfig;
+
 /**
  * The HA black box tests test the BDB cluster as a opaque unit.  Client 
connects to
  * the cluster via a failover url

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
 Fri May 25 16:23:28 2012
@@ -64,9 +64,6 @@ public class HAClusterManagementTest ext
     protected void setUp() throws Exception
     {
         _brokerType = BrokerType.SPAWNED;
-
-        assertTrue(isJavaBroker());
-        assertTrue(isBrokerStorePersistent());
         _jmxUtils.setUp();
 
         _clusterCreator.configureClusterNodes();

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
 Fri May 25 16:23:28 2012
@@ -83,13 +83,13 @@ public class HAClusterTwoNodeTest extend
     {
         setSystemProperty("java.util.logging.config.file", "etc" + 
File.separator + "log.properties");
 
-        String vhostPrefix = "virtualhosts.virtualhost." + VIRTUAL_HOST;
+        String storeConfigKeyPrefix = 
_clusterCreator.getStoreConfigKeyPrefix();
 
-        setConfigurationProperty(vhostPrefix + ".store.repConfig(0).name", 
ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT);
-        setConfigurationProperty(vhostPrefix + ".store.repConfig(0).value", "2 
s");
+        setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).name", 
ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT);
+        setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(0).value", 
"2 s");
 
-        setConfigurationProperty(vhostPrefix + ".store.repConfig(1).name", 
ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
-        setConfigurationProperty(vhostPrefix + ".store.repConfig(1).value", 
"0");
+        setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).name", 
ReplicationConfig.ELECTIONS_PRIMARY_RETRIES);
+        setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", 
"0");
 
         _clusterCreator.configureClusterNodes();
         _clusterCreator.setAutoDesignatedPrimary(autoDesignedPrimary);

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterWhiteboxTest.java
 Fri May 25 16:23:28 2012
@@ -35,6 +35,8 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.url.URLSyntaxException;
 
+import com.sleepycat.je.rep.ReplicationConfig;
+
 /**
  * The HA white box tests test the BDB cluster where the test retains the 
knowledge of the
  * individual test nodes.  It uses this knowledge to examine the nodes  to 
ensure that they

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
 Fri May 25 16:23:28 2012
@@ -53,8 +53,8 @@ public class HATestClusterCreator
     private static final String MANY_BROKER_URL_FORMAT = 
"amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
     private static final String BROKER_PORTION_FORMAT = 
"tcp://localhost:%d?connectdelay='%d',retries='%d'";
 
-    private static final int FAILOVER_CYCLECOUNT = 2;
-    private static final int FAILOVER_RETRIES = 2;
+    private static final int FAILOVER_CYCLECOUNT = 5;
+    private static final int FAILOVER_RETRIES = 1;
     private static final int FAILOVER_CONNECTDELAY = 1000;
 
     private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = 
"amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
@@ -67,7 +67,7 @@ public class HATestClusterCreator
     private final Map<Integer, Integer> _brokerPortToBdbPortMap = new 
HashMap<Integer, Integer>();
     private final Map<Integer, BrokerConfigHolder> _brokerConfigurations = new 
TreeMap<Integer, BrokerConfigHolder>();
     private final String _virtualHostName;
-    private final String _configKeyPrefix;
+    private final String _storeConfigKeyPrefix;
 
     private final String _ipAddressOfBroker;
     private final String _groupName ;
@@ -82,7 +82,7 @@ public class HATestClusterCreator
         _groupName = "group" + _testcase.getName();
         _ipAddressOfBroker = getIpAddressOfBrokerHost();
         _numberOfNodes = numberOfNodes;
-        _configKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName + 
".store.";
+        _storeConfigKeyPrefix = "virtualhosts.virtualhost." + _virtualHostName 
+ ".store.";
         _bdbHelperPort = 0;
     }
 
@@ -127,7 +127,7 @@ public class HATestClusterCreator
      */
     private String getConfigKey(String configKeySuffix)
     {
-        final String configKey = StringUtils.substringAfter(_configKeyPrefix + 
configKeySuffix, "virtualhosts.");
+        final String configKey = 
StringUtils.substringAfter(_storeConfigKeyPrefix + configKeySuffix, 
"virtualhosts.");
         return configKey;
     }
 
@@ -348,12 +348,12 @@ public class HATestClusterCreator
     {
         final String nodeName = getNodeNameForNodeAt(bdbPort);
 
-        _testcase.setConfigurationProperty(_configKeyPrefix + "class", 
"org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
+        _testcase.setConfigurationProperty(_storeConfigKeyPrefix + "class", 
"org.apache.qpid.server.store.berkeleydb.BDBHAMessageStore");
 
-        _testcase.setConfigurationProperty(_configKeyPrefix + 
"highAvailability.groupName", _groupName);
-        _testcase.setConfigurationProperty(_configKeyPrefix + 
"highAvailability.nodeName", nodeName);
-        _testcase.setConfigurationProperty(_configKeyPrefix + 
"highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
-        _testcase.setConfigurationProperty(_configKeyPrefix + 
"highAvailability.helperHostPort", getHelperHostPort());
+        _testcase.setConfigurationProperty(_storeConfigKeyPrefix + 
"highAvailability.groupName", _groupName);
+        _testcase.setConfigurationProperty(_storeConfigKeyPrefix + 
"highAvailability.nodeName", nodeName);
+        _testcase.setConfigurationProperty(_storeConfigKeyPrefix + 
"highAvailability.nodeHostPort", getNodeHostPortForNodeAt(bdbPort));
+        _testcase.setConfigurationProperty(_storeConfigKeyPrefix + 
"highAvailability.helperHostPort", getHelperHostPort());
     }
 
     public String getIpAddressOfBrokerHost()
@@ -413,4 +413,11 @@ public class HATestClusterCreator
         virtualHostConfig.setProperty(configKey, newBdbHostPort);
         collectConfig(brokerPortNumberToBeMoved, 
brokerConfigHolder.getTestConfiguration(), virtualHostConfig);
     }
+
+    public String getStoreConfigKeyPrefix()
+    {
+        return _storeConfigKeyPrefix;
+    }
+
+
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
 Fri May 25 16:23:28 2012
@@ -22,11 +22,9 @@ package org.apache.qpid.server.connectio
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.transport.TransportException;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -65,19 +63,15 @@ public class ConnectionRegistry implemen
         }
     }
 
-    public void closeConnection(AMQConnectionModel connection, AMQConstant 
cause, String message)
+    private void closeConnection(AMQConnectionModel connection, AMQConstant 
cause, String message)
     {
         try
         {
             connection.close(cause, message);
         }
-        catch (TransportException e)
+        catch (Exception e)
         {
-            _logger.warn("Error closing connection:" + e.getMessage());
-        }
-        catch (AMQException e)
-        {
-            _logger.warn("Error closing connection:" + e.getMessage());
+            _logger.warn("Exception closing connection", e);
         }
     }
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
 Fri May 25 16:23:28 2012
@@ -37,8 +37,6 @@ public interface IConnectionRegistry
 
     public void close(String replyText) throws AMQException;
 
-    public void closeConnection(AMQConnectionModel connection, AMQConstant 
cause, String message);
-
     public List<AMQConnectionModel> getConnections();
 
     public void registerConnection(AMQConnectionModel connnection);

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1342712&r1=1342711&r2=1342712&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
 Fri May 25 16:23:28 2012
@@ -875,12 +875,18 @@ public class AMQProtocolEngine implement
 
             markChannelAwaitingCloseOk(channelId);
             closeSession();
-            _stateManager.changeState(AMQState.CONNECTION_CLOSING);
-            writeFrame(e.getCloseFrame(channelId));
         }
         finally
         {
-            closeProtocolSession();
+            try
+            {
+                _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+                writeFrame(e.getCloseFrame(channelId));
+            }
+            finally
+            {
+                closeProtocolSession();
+            }
         }
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to