Repository: activemq-artemis Updated Branches: refs/heads/master 0ab88e609 -> db578d37a
ARTEMIS-616 Use Call timeout on replication flow control Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/246d11c6 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/246d11c6 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/246d11c6 Branch: refs/heads/master Commit: 246d11c6b19b8210f6da96cd6f71ad64b0862d50 Parents: 0ab88e6 Author: Clebert Suconic <[email protected]> Authored: Wed Jul 6 17:35:23 2016 -0400 Committer: Clebert Suconic <[email protected]> Committed: Wed Jul 6 19:12:25 2016 -0400 ---------------------------------------------------------------------- .../activemq/artemis/core/replication/ReplicationManager.java | 7 +++++-- .../artemis/core/server/cluster/ClusterConnection.java | 4 ++++ .../core/server/cluster/impl/ClusterConnectionImpl.java | 5 +++++ .../artemis/core/server/impl/SharedNothingLiveActivation.java | 2 +- .../tests/integration/replication/ReplicationTest.java | 2 +- 5 files changed, 16 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/246d11c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 58102d4..b254d9a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -126,6 +126,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene private CoreRemotingConnection remotingConnection; + private final long timeout; + private volatile boolean inSync = true; private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0); @@ -133,10 +135,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene /** * @param remotingConnection */ - public ReplicationManager(CoreRemotingConnection remotingConnection, final ExecutorFactory executorFactory) { + public ReplicationManager(CoreRemotingConnection remotingConnection, final long timeout, final ExecutorFactory executorFactory) { this.executorFactory = executorFactory; this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); this.remotingConnection = remotingConnection; + this.timeout = timeout; } public void appendUpdateRecord(final byte journalID, @@ -384,7 +387,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene writable.set(false); //don't wait for ever as this may hang tests etc, we've probably been closed anyway long now = System.currentTimeMillis(); - long deadline = now + 5000; + long deadline = now + timeout; while (!writable.get() && now < deadline) { replicationLock.wait(deadline - now); now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/246d11c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java index 7134bd3..c47ff48 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java @@ -77,4 +77,8 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis void removeRecord(String targetNodeID); void disconnectRecord(String targetNodeID); + + long getCallTimeout(); + + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/246d11c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 800ed5a..77f25e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -554,6 +554,11 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } @Override + public long getCallTimeout() { + return callTimeout; + } + + @Override public Map<String, String> getNodes() { synchronized (recordsGuard) { Map<String, String> nodes = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/246d11c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 6b222fb..f17bcc5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -162,7 +162,7 @@ public class SharedNothingLiveActivation extends LiveActivation { ReplicationFailureListener listener = new ReplicationFailureListener(); rc.addCloseListener(listener); rc.addFailureListener(listener); - replicationManager = new ReplicationManager(rc, activeMQServer.getExecutorFactory()); + replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory()); replicationManager.start(); Thread t = new Thread(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/246d11c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 4a00caa..6ff0cf0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -191,7 +191,7 @@ public final class ReplicationTest extends ActiveMQTestBase { setupServer(false); try { ClientSessionFactory sf = createSessionFactory(locator); - manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), factory); + manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory); addActiveMQComponent(manager); manager.start(); Assert.fail("Exception was expected");
