This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 8.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit e92ad84df4eb89e842a3a6b2f6dc91d3ddfd6a64
Author: Dedeepya T <dedeepy...@yahoo.co.in>
AuthorDate: Thu Jul 15 17:01:57 2021 +0530

    QPID-8546:[Broker-J] Use special durability for non-sync commits in BDB HA
    
    This closes #99
---
 .../store/berkeleydb/AbstractBDBMessageStore.java  |  17 +-
 .../server/store/berkeleydb/EnvironmentFacade.java |   4 +-
 .../berkeleydb/StandardEnvironmentFacade.java      |  13 +-
 .../replication/ReplicatedEnvironmentFacade.java   |  56 +++++--
 .../store/berkeleydb/BDBMessageStoreTest.java      |   2 +-
 .../ReplicatedEnvironmentFacadeTest.java           | 182 ++++++++++++++++++++-
 6 files changed, 239 insertions(+), 35 deletions(-)

diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 94d6fb1..84e1190 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -315,7 +315,7 @@ public abstract class AbstractBDBMessageStore implements 
MessageStore
         }
     }
 
-    void removeMessage(long messageId, boolean sync) throws StoreException
+    void removeMessage(long messageId) throws StoreException
     {
         boolean complete = false;
         Transaction tx = null;
@@ -351,7 +351,7 @@ public abstract class AbstractBDBMessageStore implements 
MessageStore
 
                     getLogger().debug("Deleted content for message {}", 
messageId);
 
-                    getEnvironmentFacade().commit(tx, sync);
+                    getEnvironmentFacade().commitNoSync(tx);
 
                     complete = true;
                     tx = null;
@@ -789,17 +789,16 @@ public abstract class AbstractBDBMessageStore implements 
MessageStore
      *
      * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason.
      */
-    private void commitTranImpl(final Transaction tx, boolean syncCommit) 
throws StoreException
+    private void commitTranImpl(final Transaction tx) throws StoreException
     {
         if (tx == null)
         {
             throw new StoreException("Fatal internal error: transactional is 
null at commitTran");
         }
 
-        getEnvironmentFacade().commit(tx, syncCommit);
+        getEnvironmentFacade().commit(tx);
 
-        getLogger().debug("commitTranImpl completed {} transaction {}",
-                          syncCommit ? "synchronous" : "asynchronous", tx);
+        getLogger().debug("commitTranImpl completed {} transaction 
synchronous", tx);
 
 
     }
@@ -1201,7 +1200,7 @@ public abstract class AbstractBDBMessageStore implements 
MessageStore
                         throw 
getEnvironmentFacade().handleDatabaseException("failed to begin transaction", 
e);
                     }
                     store(txn);
-                    getEnvironmentFacade().commit(txn, false);
+                    getEnvironmentFacade().commitAsync(txn, false);
 
                 }
             }
@@ -1214,7 +1213,7 @@ public abstract class AbstractBDBMessageStore implements 
MessageStore
             _messages.remove(this);
             if(stored())
             {
-                removeMessage(_messageId, false);
+                removeMessage(_messageId);
                 storedSizeChangeOccurred(-getContentSize());
             }
             if (!_messageDeleteListeners.isEmpty())
@@ -1378,7 +1377,7 @@ public abstract class AbstractBDBMessageStore implements 
MessageStore
         {
             checkMessageStoreOpen();
             doPreCommitActions();
-            AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
+            AbstractBDBMessageStore.this.commitTranImpl(_txn);
             doPostCommitActions();
             
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
         }
diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index fcf6d78..bf5e9b7 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -67,7 +67,7 @@ public interface EnvironmentFacade
 
     Transaction beginTransaction(TransactionConfig transactionConfig);
 
-    void commit(Transaction tx, boolean sync);
+    void commit(Transaction tx);
     <X> ListenableFuture<X> commitAsync(Transaction tx, X val);
 
     RuntimeException handleDatabaseException(String contextMessage, 
RuntimeException e);
@@ -98,4 +98,6 @@ public interface EnvironmentFacade
     Map<String,Object> getDatabaseStatistics(String database, boolean reset);
 
     void deleteDatabase(String databaseName);
+
+    void commitNoSync(final Transaction tx);
 }
diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index 271c54c..3b55d24 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -166,7 +166,12 @@ public class StandardEnvironmentFacade implements 
EnvironmentFacade
     }
 
     @Override
-    public void commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
+    public void commit(Transaction tx)
+    {
+        commitInternal(tx, true);
+    }
+
+    private void commitInternal(final Transaction tx, final boolean syncCommit)
     {
         try
         {
@@ -184,6 +189,12 @@ public class StandardEnvironmentFacade implements 
EnvironmentFacade
     }
 
     @Override
+    public void commitNoSync(final Transaction tx)
+    {
+        commitInternal(tx, false);
+    }
+
+    @Override
     public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X 
val)
     {
         try
diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 7b15c44..3e201ba 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -93,6 +93,7 @@ public class ReplicatedEnvironmentFacade implements 
EnvironmentFacade, StateChan
     public static final String ENVIRONMENT_RESTART_RETRY_LIMIT_PROPERTY_NAME = 
"qpid.bdb.ha.environment_restart_retry_limit";
     public static final String EXECUTOR_SHUTDOWN_TIMEOUT_PROPERTY_NAME = 
"qpid.bdb.ha.executor_shutdown_timeout";
     public static final String DISABLE_COALESCING_COMMITTER_PROPERTY_NAME = 
"qpid.bdb.ha.disable_coalescing_committer";
+    public static final String NO_SYNC_TX_DURABILITY_PROPERTY_NAME = 
"qpid.bdb.ha.noSyncTxDurablity";
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ReplicatedEnvironmentFacade.class);
 
@@ -103,6 +104,8 @@ public class ReplicatedEnvironmentFacade implements 
EnvironmentFacade, StateChan
     private static final int DEFAULT_ENVIRONMENT_RESTART_RETRY_LIMIT = 3;
     private static final int DEFAULT_EXECUTOR_SHUTDOWN_TIMEOUT = 5000;
     private static final boolean DEFAULT_DISABLE_COALESCING_COMMITTER = false;
+    private static final String DEFAULT_NO_SYNC_TX_DURABILITY_PROPERTY_NAME = 
"NO_SYNC,NO_SYNC,NONE";
+    private static final String DEFAULT_SYNC_TX_DURABILITY_PROPERTY_NAME = 
"SYNC,NO_SYNC,NONE";
 
     /** Length of time allowed for a master transfer to complete before the 
operation will timeout */
     private final int _masterTransferTimeout;
@@ -139,6 +142,8 @@ public class ReplicatedEnvironmentFacade implements 
EnvironmentFacade, StateChan
 
     private final boolean _disableCoalescingCommiter;
 
+    private final Durability _noSyncTxDurability;
+
     private final int _logHandlerCleanerProtectedFilesLimit;
 
     static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = 
SyncPolicy.SYNC;
@@ -284,6 +289,7 @@ public class ReplicatedEnvironmentFacade implements 
EnvironmentFacade, StateChan
         _stateChangeExecutor = 
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new 
DaemonThreadFactory("StateChange-" + _prettyGroupNodeName)));
         _groupChangeExecutor = new ScheduledThreadPoolExecutor(2, new 
DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
         _disableCoalescingCommiter = 
configuration.getFacadeParameter(Boolean.class,DISABLE_COALESCING_COMMITTER_PROPERTY_NAME,
 DEFAULT_DISABLE_COALESCING_COMMITTER);
+        _noSyncTxDurability = 
Durability.parse(configuration.getFacadeParameter(String.class, 
NO_SYNC_TX_DURABILITY_PROPERTY_NAME, 
getDefaultDurability(_disableCoalescingCommiter)));
 
         // create environment in a separate thread to avoid renaming of the 
current thread by JE
         EnvHomeRegistry.getInstance().registerHome(_environmentDirectory);
@@ -318,6 +324,18 @@ public class ReplicatedEnvironmentFacade implements 
EnvironmentFacade, StateChan
         }
     }
 
+    private String getDefaultDurability(final boolean 
disableCoalescingCommiter)
+    {
+        if (disableCoalescingCommiter)
+        {
+            return DEFAULT_SYNC_TX_DURABILITY_PROPERTY_NAME;
+        }
+        else
+        {
+            return DEFAULT_NO_SYNC_TX_DURABILITY_PROPERTY_NAME;
+        }
+    }
+
     @Override
     public Transaction beginTransaction(TransactionConfig transactionConfig)
     {
@@ -325,40 +343,48 @@ public class ReplicatedEnvironmentFacade implements 
EnvironmentFacade, StateChan
     }
 
     @Override
-    public void commit(final Transaction tx, boolean syncCommit)
+    public void commit(final Transaction tx)
     {
-        try
-        {
-            // Using commit() instead of commitNoSync() for the HA store to 
allow
-            // the HA durability configuration to influence resulting 
behaviour.
-            tx.commit(_realMessageStoreDurability);
-        }
-        catch (DatabaseException de)
-        {
-            throw handleDatabaseException("Got DatabaseException on commit, 
closing environment", de);
-        }
+        commitInternal(tx, _realMessageStoreDurability);
 
         if (_coalescingCommiter != null && 
_realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
                 && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
         {
-            _coalescingCommiter.commit(tx, syncCommit);
+            _coalescingCommiter.commit(tx, true);
         }
 
     }
 
-    @Override
-    public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X 
val)
+    private void commitInternal(final Transaction tx, final Durability 
realMessageStoreDurability)
     {
         try
         {
             // Using commit() instead of commitNoSync() for the HA store to 
allow
             // the HA durability configuration to influence resulting 
behaviour.
-            tx.commit(_realMessageStoreDurability);
+            tx.commit(realMessageStoreDurability);
         }
         catch (DatabaseException de)
         {
             throw handleDatabaseException("Got DatabaseException on commit, 
closing environment", de);
         }
+    }
+
+    @Override
+    public void commitNoSync(final Transaction tx)
+    {
+        commitInternal(tx, _noSyncTxDurability);
+
+        if (_coalescingCommiter != null && 
_realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
+            && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
+        {
+            _coalescingCommiter.commit(tx, false);
+        }
+    }
+
+    @Override
+    public <X> ListenableFuture<X> commitAsync(final Transaction tx, final X 
val)
+    {
+        commitInternal(tx, _realMessageStoreDurability);
 
         if (_coalescingCommiter != null && 
_realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
             && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
diff --git 
a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
 
b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index d3fea6e..8b03750 100644
--- 
a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ 
b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -116,7 +116,7 @@ public class BDBMessageStoreTest extends 
MessageStoreTestCase
         StoredMessage<MessageMetaData> storedMessage_0_8 = 
createAndStoreSingleChunkMessage_0_8(bdbStore);
         long messageid_0_8 = storedMessage_0_8.getMessageNumber();
 
-        bdbStore.removeMessage(messageid_0_8, true);
+        bdbStore.removeMessage(messageid_0_8);
 
         //verify the removal using the BDB store implementation methods 
directly
         try
diff --git 
a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
 
b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 90c15f7..9094f02 100644
--- 
a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ 
b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -21,11 +21,11 @@
 package org.apache.qpid.server.store.berkeleydb.replication;
 
 import static 
org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.JUL_LOGGER_LEVEL_OVERRIDE;
-import static org.apache.qpid.server.store.berkeleydb.EnvironmentFacade
-        .LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME;
+import static 
org.apache.qpid.server.store.berkeleydb.EnvironmentFacade.LOG_HANDLER_CLEANER_PROTECTED_FILES_LIMIT_PROPERTY_NAME;
 import static 
org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.*;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -37,8 +37,8 @@ import static org.junit.Assume.assumeThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -69,6 +69,8 @@ import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.Transaction;
 import com.sleepycat.je.TransactionConfig;
 import com.sleepycat.je.rep.NoConsistencyRequiredPolicy;
@@ -1196,18 +1198,172 @@ public class ReplicatedEnvironmentFacadeTest extends 
UnitTestBase
                    masterListener.awaitForStateChange(State.MASTER, _timeout, 
TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testNodeCommitNoSyncWithCoalescing() throws Exception
+    {
+        DatabaseConfig createConfig = new DatabaseConfig();
+        createConfig.setAllowCreate(true);
+        createConfig.setTransactional(true);
+
+        TestStateChangeListener masterListener = new TestStateChangeListener();
+        ReplicatedEnvironmentFacade node1 =
+                addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, 
true, masterListener, new NoopReplicationGroupListener(), 
false,"NO_SYNC,NO_SYNC,NONE");
+        assertTrue("Environment was not created", 
masterListener.awaitForStateChange(State.MASTER,
+                                                                               
      _timeout, TimeUnit.SECONDS));
+
+        String replicaNodeHostPort = "localhost:" + 
_portHelper.getNextAvailable();
+        String replicaName = TEST_NODE_NAME + 1;
+        ReplicatedEnvironmentFacade node2 =
+                createReplica(replicaName, replicaNodeHostPort, new 
NoopReplicationGroupListener());
+
+        Database db = node1.openDatabase("mydb", createConfig);
+
+        int key = 1;
+        String data = "value";
+        // Put a record (using commitNoSync)
+        TransactionConfig transactionConfig = 
addTestKeyValueWithCommitNoSync(node1, db, key, data);
+        db.close();
+
+        node1.close();
+        node2.close();
+
+        LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
+
+        node1 = addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, 
true, masterListener, new NoopReplicationGroupListener(), 
false,"NO_SYNC,NO_SYNC,NONE");
+        boolean awaitForStateChange = 
masterListener.awaitForStateChange(State.MASTER,
+                                                                         
_timeout, TimeUnit.SECONDS);
+        LOGGER.debug("RESTARTING " + replicaName);
+        TestStateChangeListener node2StateChangeListener = new 
TestStateChangeListener();
+        node2 = addNode(replicaName,
+                        replicaNodeHostPort,
+                        false,
+                        node2StateChangeListener,
+                        new NoopReplicationGroupListener());
+        db = node1.openDatabase("mydb", DatabaseConfig.DEFAULT);
+        byte[] resultData = getTestKeyValue(node1, db, key, transactionConfig);
+        DatabaseEntry dbData = getDatabaseEntry(data);
+        assertArrayEquals(resultData, dbData.getData());
+        assertEquals("value", StringBinding.entryToString(dbData));
+
+        db.close();
+
+        LOGGER.debug("CLOSING");
+        node1.close();
+        node2.close();
+    }
+
+    @Test
+    public void testNodeCommitSyncWithoutCoalescing() throws Exception
+    {
+        DatabaseConfig createConfig = new DatabaseConfig();
+        createConfig.setAllowCreate(true);
+        createConfig.setTransactional(true);
+
+        TestStateChangeListener masterListener = new TestStateChangeListener();
+        ReplicatedEnvironmentFacade node1 =
+                addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, 
true, masterListener, new 
NoopReplicationGroupListener(),true,"SYNC,NO_SYNC,NONE");
+
+        assertTrue("Environment was not created", 
masterListener.awaitForStateChange(State.MASTER,
+                                                                               
      _timeout, TimeUnit.SECONDS));
+
+        String replicaNodeHostPort = "localhost:" + 
_portHelper.getNextAvailable();
+        String replicaName = TEST_NODE_NAME + 1;
+        ReplicatedEnvironmentFacade node2 =
+                createReplica(replicaName, replicaNodeHostPort, new 
NoopReplicationGroupListener());
+
+        Database db = node1.openDatabase("mydb", createConfig);
+
+        int key = 1;
+        String data = "value";
+        // Put a record (using commitNoSync)
+        TransactionConfig transactionConfig = 
addTestKeyValueWithCommitNoSync(node1, db, key, data);
+        db.close();
+
+        node1.close();
+        node2.close();
+
+        LOGGER.debug("RESTARTING " + TEST_NODE_NAME);
+
+        node1 = addNodeWithDurability(TEST_NODE_NAME, TEST_NODE_HOST_PORT, 
true, masterListener, new 
NoopReplicationGroupListener(),true,"SYNC,NO_SYNC,NONE");
+        boolean awaitForStateChange = 
masterListener.awaitForStateChange(State.MASTER,
+                                                                         
_timeout, TimeUnit.SECONDS);
+        LOGGER.debug("RESTARTING " + replicaName);
+        TestStateChangeListener node2StateChangeListener = new 
TestStateChangeListener();
+        node2 = addNode(replicaName,
+                        replicaNodeHostPort,
+                        false,
+                        node2StateChangeListener,
+                        new NoopReplicationGroupListener());
+        db = node1.openDatabase("mydb", DatabaseConfig.DEFAULT);
+        byte[] resultData = getTestKeyValue(node1, db, key, transactionConfig);
+        DatabaseEntry dbData = getDatabaseEntry(data);
+        assertArrayEquals(resultData, dbData.getData());
+        assertEquals("value", StringBinding.entryToString(dbData));
+
+        db.close();
+
+        LOGGER.debug("CLOSING");
+        node1.close();
+        node2.close();
+    }
+
+    private DatabaseEntry getDatabaseEntry(final String data)
+    {
+        DatabaseEntry dbData = new DatabaseEntry();
+        StringBinding.stringToEntry(data, dbData);
+        return dbData;
+    }
+    private DatabaseEntry getDatabaseEntry(final int data)
+    {
+        DatabaseEntry dbData = new DatabaseEntry();
+        IntegerBinding.intToEntry(data, dbData);
+        return dbData;
+    }
+
+    private byte[] getTestKeyValue(final ReplicatedEnvironmentFacade node1,
+                                   final Database db,
+                                   final int keyValue,
+                                   final TransactionConfig transactionConfig)
+    {
+        Transaction txn = node1.beginTransaction(transactionConfig);
+
+        DatabaseEntry key = getDatabaseEntry(keyValue);
+        DatabaseEntry result = new DatabaseEntry();
+        OperationStatus status = db.get(txn, key, result, 
LockMode.READ_UNCOMMITTED);
+        txn.commit();
+        byte[] resultData = new byte[0];
+        if (status == OperationStatus.SUCCESS)
+        {
+            resultData = result.getData();
+        }
+        return resultData;
+    }
+
+    private TransactionConfig addTestKeyValueWithCommitNoSync(final 
ReplicatedEnvironmentFacade node1,
+                                                              final Database 
db,
+                                                              final int 
keyValue, final String dataValue)
+    {
+        DatabaseEntry key = getDatabaseEntry(keyValue);
+        DatabaseEntry data = getDatabaseEntry(dataValue);
+        TransactionConfig transactionConfig = new TransactionConfig();
+        transactionConfig.setDurability(node1.getRealMessageStoreDurability());
+
+        Transaction txn = node1.beginTransaction(null);
+        db.put(txn, key, data);
+        node1.commitNoSync(txn);
+        return transactionConfig;
+    }
+
+
     private void putRecord(final ReplicatedEnvironmentFacade master, final 
Database db, final int keyValue,
                            final String dataValue)
     {
-        DatabaseEntry key = new DatabaseEntry();
-        DatabaseEntry data = new DatabaseEntry();
+        DatabaseEntry key = getDatabaseEntry(keyValue);
+        DatabaseEntry data = getDatabaseEntry(dataValue);
 
         TransactionConfig transactionConfig = new TransactionConfig();
         
transactionConfig.setDurability(master.getRealMessageStoreDurability());
         Transaction txn = master.beginTransaction(transactionConfig);
-        IntegerBinding.intToEntry(keyValue, key);
-        StringBinding.stringToEntry(dataValue, data);
-
         db.put(txn, key, data);
         txn.commit();
     }
@@ -1293,6 +1449,16 @@ public class ReplicatedEnvironmentFacadeTest extends 
UnitTestBase
        return 
addNode(nodeName,nodeHostPort,designatedPrimary,stateChangeListener,replicationGroupListener,false);
     }
 
+    private ReplicatedEnvironmentFacade addNodeWithDurability(String nodeName, 
String nodeHostPort, boolean designatedPrimary,
+                                                StateChangeListener 
stateChangeListener, ReplicationGroupListener replicationGroupListener, boolean 
disableCoalescing, String durability)
+    {
+        ReplicatedEnvironmentConfiguration config = 
createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, 
designatedPrimary,disableCoalescing);
+        when(config.getFacadeParameter(eq(String.class),
+                                     eq(NO_SYNC_TX_DURABILITY_PROPERTY_NAME),
+                                     anyString())).thenReturn(durability);
+        return createReplicatedEnvironmentFacade(nodeName, 
stateChangeListener, replicationGroupListener, config);
+
+    }
     private ReplicatedEnvironmentFacade 
createReplicatedEnvironmentFacade(String nodeName, StateChangeListener 
stateChangeListener, ReplicationGroupListener replicationGroupListener, 
ReplicatedEnvironmentConfiguration config) {
         ReplicatedEnvironmentFacade ref = new 
ReplicatedEnvironmentFacade(config);
         ref.setStateChangeListener(stateChangeListener);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to