This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch 8.0.x in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/8.0.x by this push: new 3e6ad50 QPID-8547: [Broker-J] Configurable parameters for CoalescingCommiter 3e6ad50 is described below commit 3e6ad50acdd8c49e034459ff1d352a88e0774cb8 Author: dakirily <daniel.kiril...@gmail.com> AuthorDate: Fri Jul 9 13:38:48 2021 +0200 QPID-8547: [Broker-J] Configurable parameters for CoalescingCommiter This closes #101 --- .../store/berkeleydb/CoalescingCommiter.java | 15 +++--- .../berkeleydb/StandardEnvironmentFacade.java | 18 +++++-- .../replication/ReplicatedEnvironmentFacade.java | 59 ++++++++++++++++++++-- .../virtualhost/berkeleydb/BDBVirtualHost.java | 10 ++++ .../store/berkeleydb/CoalescingCommitterTest.java | 4 +- .../src/docbkx/Java-Broker-High-Availability.xml | 5 +- 6 files changed, 96 insertions(+), 15 deletions(-) diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index 3dceae4..a4d082d 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -40,9 +40,9 @@ public class CoalescingCommiter implements Committer { private final CommitThread _commitThread; - public CoalescingCommiter(String name, EnvironmentFacade environmentFacade) + public CoalescingCommiter(String name, int commiterNotifyThreshold, long commiterWaitTimeout, EnvironmentFacade environmentFacade) { - _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade); + _commitThread = new CommitThread("Commit-Thread-" + name, commiterNotifyThreshold, commiterWaitTimeout, environmentFacade); } @Override @@ -134,8 +134,9 @@ public class CoalescingCommiter implements Committer private static class CommitThread extends Thread { private static final Logger LOGGER = LoggerFactory.getLogger(CommitThread.class); - private static final int JOB_QUEUE_NOTIFY_THRESHOLD = 8; + private final int _jobQueueNotifyThreshold; + private final long _commiterWaitTimeout; private final AtomicBoolean _stopped = new AtomicBoolean(false); private final Queue<CommitThreadJob> _jobQueue = new ConcurrentLinkedQueue<>(); private final Object _lock = new Object(); @@ -143,9 +144,11 @@ public class CoalescingCommiter implements Committer private final List<CommitThreadJob> _inProcessJobs = new ArrayList<>(256); - public CommitThread(String name, EnvironmentFacade environmentFacade) + public CommitThread(String name, int commiterNotifyThreshold, long commiterWaitTimeout, EnvironmentFacade environmentFacade) { super(name); + this._jobQueueNotifyThreshold = commiterNotifyThreshold; + this._commiterWaitTimeout = commiterWaitTimeout; _environmentFacade = environmentFacade; } @@ -170,7 +173,7 @@ public class CoalescingCommiter implements Committer { // Periodically wake up and check, just in case we // missed a notification. Don't want to lock the broker hard. - _lock.wait(500); + _lock.wait(_commiterWaitTimeout); } catch (InterruptedException e) { @@ -248,7 +251,7 @@ public class CoalescingCommiter implements Committer throw new IllegalStateException("Commit thread is stopped"); } _jobQueue.add(commit); - if(sync || _jobQueue.size() >= JOB_QUEUE_NOTIFY_THRESHOLD) + if(sync || _jobQueue.size() >= _jobQueueNotifyThreshold) { synchronized (_lock) { diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index 3b55d24..59070a1 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import com.google.common.util.concurrent.ListenableFuture; + import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -45,14 +46,15 @@ import com.sleepycat.je.Sequence; import com.sleepycat.je.SequenceConfig; import com.sleepycat.je.Transaction; import com.sleepycat.je.TransactionConfig; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.berkeleydb.logging.Slf4jLoggingHandler; +import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost; public class StandardEnvironmentFacade implements EnvironmentFacade { @@ -154,7 +156,17 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } } - _committer = new CoalescingCommiter(name, this); + final int commiterNotifyThreshold = configuration.getFacadeParameter( + Integer.class, + BDBVirtualHost.QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD, + BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD + ); + final long commiterWaitTimeout = configuration.getFacadeParameter( + Long.class, + BDBVirtualHost.QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT, + BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT + ); + _committer = new CoalescingCommiter(name, commiterNotifyThreshold, commiterWaitTimeout, this); _committer.start(); } diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 3e201ba..e3a891f 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -23,8 +23,10 @@ package org.apache.qpid.server.store.berkeleydb.replication; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; + import java.net.InetSocketAddress; import java.net.SocketTimeoutException; + import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -51,10 +53,49 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.sleepycat.je.*; + +import com.sleepycat.je.CheckpointConfig; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.DbInternal; +import com.sleepycat.je.Durability; import com.sleepycat.je.Durability.ReplicaAckPolicy; import com.sleepycat.je.Durability.SyncPolicy; -import com.sleepycat.je.rep.*; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.EnvironmentFailureException; +import com.sleepycat.je.EnvironmentMutableConfig; +import com.sleepycat.je.ExceptionEvent; +import com.sleepycat.je.LogWriteException; +import com.sleepycat.je.Sequence; +import com.sleepycat.je.SequenceConfig; +import com.sleepycat.je.Transaction; +import com.sleepycat.je.TransactionConfig; +import com.sleepycat.je.rep.AppStateMonitor; +import com.sleepycat.je.rep.InsufficientAcksException; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.InsufficientReplicasException; +import com.sleepycat.je.rep.MasterStateException; +import com.sleepycat.je.rep.MemberNotFoundException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.NoConsistencyRequiredPolicy; +import com.sleepycat.je.rep.NodeState; +import com.sleepycat.je.rep.NodeType; +import com.sleepycat.je.rep.RepInternal; +import com.sleepycat.je.rep.ReplicaConsistencyException; +import com.sleepycat.je.rep.ReplicaWriteException; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.ReplicationGroup; +import com.sleepycat.je.rep.ReplicationMutableConfig; +import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.RestartRequiredException; +import com.sleepycat.je.rep.RollbackException; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.UnknownMasterException; import com.sleepycat.je.rep.impl.node.NameIdPair; import com.sleepycat.je.rep.util.DbPing; import com.sleepycat.je.rep.util.ReplicationGroupAdmin; @@ -64,6 +105,7 @@ import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedExcept import com.sleepycat.je.rep.vlsn.VLSNRange; import com.sleepycat.je.utilint.PropUtil; import com.sleepycat.je.utilint.VLSN; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +125,7 @@ import org.apache.qpid.server.util.DaemonThreadFactory; import org.apache.qpid.server.util.ExternalServiceException; import org.apache.qpid.server.util.ExternalServiceTimeoutException; import org.apache.qpid.server.util.ServerScopedRuntimeException; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener { @@ -1722,7 +1765,17 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan if (!_disableCoalescingCommiter && localTransactionSynchronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY) { localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC; - _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), this); + final int commiterNotifyThreshold = _configuration.getFacadeParameter( + Integer.class, + BDBVirtualHost.QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD, + BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD + ); + final long commiterWaitTimeout = _configuration.getFacadeParameter( + Long.class, + BDBVirtualHost.QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT, + BDBVirtualHost.DEFAULT_QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT + ); + _coalescingCommiter = new CoalescingCommiter(_configuration.getGroupName(), commiterNotifyThreshold, commiterWaitTimeout, this); _coalescingCommiter.start(); } _realMessageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy); diff --git a/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java b/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java index 9b44503..08db3bc 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java @@ -37,11 +37,21 @@ public interface BDBVirtualHost<X extends BDBVirtualHost<X>> extends QueueManagi long BDB_MIN_CACHE_SIZE = 10*1024*1024; String QPID_BROKER_BDB_TOTAL_CACHE_SIZE = "qpid.broker.bdbTotalCacheSize"; + String QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD = "qpid.broker.bdbCommiterNotifyThreshold"; + String QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT = "qpid.broker.bdbCommiterWaitTimeout"; // Default the JE cache to 5% of total memory, but no less than 10Mb @ManagedContextDefault(name= QPID_BROKER_BDB_TOTAL_CACHE_SIZE) long DEFAULT_JE_CACHE_SIZE = Math.max(BDB_MIN_CACHE_SIZE, Runtime.getRuntime().maxMemory()/20l); + @SuppressWarnings("unused") + @ManagedContextDefault(name = QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD, description = "Threshold for amount of messages triggering BDB log flush to the disk") + int DEFAULT_QPID_BROKER_BDB_COMMITER_NOTIFY_THRESHOLD = 8; + + @SuppressWarnings("unused") + @ManagedContextDefault(name = QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT, description = "Timeout for BDB log flush to the disk") + long DEFAULT_QPID_BROKER_BDB_COMMITER_WAIT_TIMEOUT = 500L; + @Override @ManagedAttribute(mandatory = true, defaultValue = "${qpid.work_dir}${file.separator}${this:name}${file.separator}messages") String getStorePath(); diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java index 4ac18c2..f7968b8 100644 --- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java +++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommitterTest.java @@ -53,7 +53,7 @@ public class CoalescingCommitterTest extends UnitTestBase assumeThat(getVirtualHostNodeStoreType(), is(equalTo(VirtualHostNodeStoreType.BDB))); _environmentFacade = mock(EnvironmentFacade.class); - _coalescingCommitter = new CoalescingCommiter("Test", _environmentFacade); + _coalescingCommitter = new CoalescingCommiter("Test", 8, 500, _environmentFacade); _coalescingCommitter.start(); } @@ -119,4 +119,4 @@ public class CoalescingCommitterTest extends UnitTestBase verify(_environmentFacade, times(2)).flushLog(); verify(_environmentFacade, times(1)).flushLogFailed(testFailure); } -} \ No newline at end of file +} diff --git a/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml b/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml index 121e4be..ff5d7d2 100644 --- a/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml +++ b/doc/java-broker/src/docbkx/Java-Broker-High-Availability.xml @@ -294,7 +294,10 @@ <para><emphasis>NO_SYNC</emphasis>. The node immediately sends the acknowledgement. The transaction will be written and OS level buffers flushed as some point later. NO_SYNC offers the highest performance but the lowest durability level. This synchronization - policy is sometimes known as <emphasis>commit to the network</emphasis>.</para> + policy is sometimes known as <emphasis>commit to the network</emphasis>. Flushing + behavior can be influenced by virtual host context parameters "qpid.broker.bdbCommiterNotifyThreshold" + (defines threshold for amount of messages triggering BDB log flush to the disk) and + "qpid.broker.bdbCommiterWaitTimeout" (defines timeout for BDB log flush to the disk).</para> </listitem> </itemizedlist></para> <para>It is possible to assign a one policy to the master and a different policy to the --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org