Repository: activemq-artemis Updated Branches: refs/heads/1.x c54120a47 -> 3ac1a16f9
ARTEMIS-1269 Fixing blocked replication If replication blocked anything on the journal the processing from clients would be blocked and nothing would work. As part of this fix I am using an executor on ServerSessionPacketHandler which will also scale better as the reader from Netty would be feed immediately. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/276319d7 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/276319d7 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/276319d7 Branch: refs/heads/1.x Commit: 276319d72b19ea03497882139fc768b7628576d2 Parents: c54120a Author: Clebert Suconic <[email protected]> Authored: Thu Jul 6 12:37:35 2017 -0400 Committer: Clebert Suconic <[email protected]> Committed: Fri Jul 7 08:56:31 2017 -0400 ---------------------------------------------------------------------- .../artemis/utils/OrderedExecutorFactory.java | 17 +++++ .../core/journal/impl/FileWrapperJournal.java | 80 ++++++++++++-------- .../core/ServerSessionPacketHandler.java | 42 +++++++++- .../core/impl/ActiveMQPacketHandler.java | 2 +- .../core/replication/ReplicationEndpoint.java | 45 ++++------- .../core/server/cluster/ClusterController.java | 3 + .../ExpireWhileLoadBalanceTest.java | 2 +- 7 files changed, 122 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/276319d7/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java index c7d5c03..f4c85f3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java @@ -18,7 +18,9 @@ package org.apache.activemq.artemis.utils; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; @@ -33,6 +35,21 @@ public final class OrderedExecutorFactory implements ExecutorFactory { private final Executor parent; + + public static boolean flushExecutor(Executor executor) { + return flushExecutor(executor, 30, TimeUnit.SECONDS); + } + + public static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit) { + final CountDownLatch latch = new CountDownLatch(1); + executor.execute(latch::countDown); + try { + return latch.await(timeout, unit); + } catch (Exception e) { + return false; + } + } + /** * Construct a new instance delegating to the given parent executor. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/276319d7/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 0b702a5..3cc1454 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -95,7 +95,7 @@ public final class FileWrapperJournal extends JournalBase { IOCompletion callback) throws Exception { JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record); - writeRecord(addRecord, sync, callback); + writeRecord(addRecord, false, -1, false, callback); } @Override @@ -106,7 +106,9 @@ public final class FileWrapperJournal extends JournalBase { * Write the record to the current file. */ private void writeRecord(JournalInternalRecord encoder, - final boolean sync, + final boolean tx, + final long txID, + final boolean removeTX, final IOCompletion callback) throws Exception { lockAppend.lock(); @@ -114,30 +116,54 @@ public final class FileWrapperJournal extends JournalBase { if (callback != null) { callback.storeLineUp(); } - currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize()); + testSwitchFiles(encoder); + if (txID >= 0) { + if (tx) { + AtomicInteger value; + if (removeTX) { + value = transactions.remove(txID); + } else { + value = transactions.get(txID); + } + if (value != null) { + encoder.setNumberOfRecords(value.get()); + } + } else { + count(txID); + } + } encoder.setFileID(currentFile.getRecordID()); if (callback != null) { - currentFile.getFile().write(encoder, sync, callback); + currentFile.getFile().write(encoder, false, callback); } else { - currentFile.getFile().write(encoder, sync); + currentFile.getFile().write(encoder, false); } } finally { lockAppend.unlock(); } } + private void testSwitchFiles(JournalInternalRecord encoder) throws Exception { + JournalFile oldFile = currentFile; + currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize()); + if (oldFile != currentFile) { + for (AtomicInteger value : transactions.values()) { + value.set(0); + } + } + } + @Override public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception { JournalInternalRecord deleteRecord = new JournalDeleteRecord(id); - writeRecord(deleteRecord, sync, callback); + writeRecord(deleteRecord, false, -1, false, callback); } @Override public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception { - count(txID); JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record); - writeRecord(deleteRecordTX, false, null); + writeRecord(deleteRecordTX, false, txID, false, null); } @Override @@ -145,10 +171,9 @@ public final class FileWrapperJournal extends JournalBase { long id, byte recordType, EncodingSupport record) throws Exception { - count(txID); JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record); - writeRecord(addRecord, false, null); - } + writeRecord(addRecord, false, txID, false, null); + } @Override public void appendUpdateRecord(long id, @@ -157,7 +182,7 @@ public final class FileWrapperJournal extends JournalBase { boolean sync, IOCompletion callback) throws Exception { JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record); - writeRecord(updateRecord, sync, callback); + writeRecord(updateRecord, false, -1, false, callback); } @Override @@ -165,9 +190,8 @@ public final class FileWrapperJournal extends JournalBase { long id, byte recordType, EncodingSupport record) throws Exception { - count(txID); - JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record); - writeRecord(updateRecordTX, false, null); + JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record); + writeRecord(updateRecordTX, false, txID, false, null); } @Override @@ -176,12 +200,8 @@ public final class FileWrapperJournal extends JournalBase { IOCompletion callback, boolean lineUpContext) throws Exception { JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null); - AtomicInteger value = transactions.remove(Long.valueOf(txID)); - if (value != null) { - commitRecord.setNumberOfRecords(value.get()); - } - writeRecord(commitRecord, true, callback); + writeRecord(commitRecord, true, txID, true, callback); } @Override @@ -190,20 +210,18 @@ public final class FileWrapperJournal extends JournalBase { boolean sync, IOCompletion callback) throws Exception { JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData); - AtomicInteger value = transactions.get(Long.valueOf(txID)); - if (value != null) { - prepareRecord.setNumberOfRecords(value.get()); - } - writeRecord(prepareRecord, sync, callback); + writeRecord(prepareRecord, true, txID, false, callback); } private int count(long txID) throws ActiveMQException { AtomicInteger defaultValue = new AtomicInteger(1); - AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue); + AtomicInteger count = transactions.putIfAbsent(txID, defaultValue); if (count != null) { - return count.incrementAndGet(); + count.incrementAndGet(); + } else { + count = defaultValue; } - return defaultValue.get(); + return count.intValue(); } @Override @@ -214,11 +232,7 @@ public final class FileWrapperJournal extends JournalBase { @Override public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception { JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID); - AtomicInteger value = transactions.remove(Long.valueOf(txID)); - if (value != null) { - rollbackRecord.setNumberOfRecords(value.get()); - } - writeRecord(rollbackRecord, sync, callback); + writeRecord(rollbackRecord, true, txID, true, callback); } // UNSUPPORTED STUFF http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/276319d7/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 731d6ca..5dbb4f1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.List; +import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -79,6 +80,9 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.SimpleFuture; +import org.apache.activemq.artemis.utils.SimpleFutureImpl; import org.jboss.logging.Logger; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE; @@ -129,7 +133,10 @@ public class ServerSessionPacketHandler implements ChannelHandler { private final boolean direct; - public ServerSessionPacketHandler(final ServerSession session, + private final Executor callExecutor; + + public ServerSessionPacketHandler(final Executor callExecutor, + final ServerSession session, final StorageManager storageManager, final Channel channel) { this.session = session; @@ -143,6 +150,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { //TODO think of a better way of doing this Connection conn = remotingConnection.getTransportConnection(); + this.callExecutor = callExecutor; + if (conn instanceof NettyConnection) { direct = ((NettyConnection) conn).isDirectDeliver(); } else { @@ -166,11 +175,18 @@ public class ServerSessionPacketHandler implements ChannelHandler { } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorClosingSession(e); } + flushExecutor(); ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName()); } + private void flushExecutor() { + OrderedExecutorFactory.flushExecutor(callExecutor); + } + public void close() { + flushExecutor(); + channel.flushConfirmations(); try { @@ -186,6 +202,11 @@ public class ServerSessionPacketHandler implements ChannelHandler { @Override public void handlePacket(final Packet packet) { + channel.confirm(packet); + callExecutor.execute(() -> internalHandlePacket(packet)); + } + + private void internalHandlePacket(final Packet packet) { byte type = packet.getType(); storageManager.setContext(session.getSessionContext()); @@ -562,8 +583,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { final boolean flush, final boolean closeChannel) { if (confirmPacket != null) { - channel.confirm(confirmPacket); - if (flush) { channel.flushConfirmations(); } @@ -587,9 +606,26 @@ public class ServerSessionPacketHandler implements ChannelHandler { remotingConnection.removeFailureListener((FailureListener) closeListener); } } + + flushExecutor(); } public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) { + + SimpleFuture<Integer> future = new SimpleFutureImpl<>(); + callExecutor.execute(() -> { + int value = internaltransferConnection(newConnection, lastReceivedCommandID); + future.set(value); + }); + + try { + return future.get().intValue(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private int internaltransferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) { // We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get // delivered // after the channel has transferred but *before* packets have been replayed - this will give the client the wrong http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/276319d7/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index 64e496a..ad114e0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -155,7 +155,7 @@ public class ActiveMQPacketHandler implements ChannelHandler { ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext); - ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel); + ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server.getExecutorFactory().getExecutor(), session, server.getStorageManager(), channel); channel.setHandler(handler); // TODO - where is this removed? http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/276319d7/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 04488cd..0dac2cb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -27,9 +27,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; @@ -80,6 +78,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; +import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.jboss.logging.Logger; /** @@ -203,9 +202,11 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon ActiveMQServerLogger.LOGGER.invalidPacketForReplication(packet); } } catch (ActiveMQException e) { + logger.warn(e.getMessage(), e); ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet); response = new ActiveMQExceptionMessage(e); } catch (Exception e) { + logger.warn(e.getMessage(), e); ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet); response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)); } @@ -277,6 +278,12 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon return; } + logger.trace("Stopping endpoint"); + + started = false; + + OrderedExecutorFactory.flushExecutor(executor); + // Channel may be null if there isn't a connection to a live server if (channel != null) { channel.close(); @@ -314,15 +321,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon pageManager.stop(); pageIndex.clear(); - final CountDownLatch latch = new CountDownLatch(1); - executor.execute(new Runnable() { - - @Override - public void run() { - latch.countDown(); - } - }); - latch.await(30, TimeUnit.SECONDS); // Storage needs to be the last to stop storageManager.stop(); @@ -470,28 +468,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon if (logger.isTraceEnabled()) { logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet); } + ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); + if (!started) + return replicationResponseMessage; if (packet.isSynchronizationFinished()) { - executor.execute(() -> { - try { - // this is a long running process, we cannot block the reading thread from netty - finishSynchronization(packet.getNodeID()); - if (logger.isTraceEnabled()) { - logger.trace("returning completion on synchronization catchup"); - } - channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true)); - } catch (Exception e) { - logger.warn(e.getMessage()); - channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e))); - } - - }); - // the write will happen through an executor - return null; - } - - ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); - if (!started) { + finishSynchronization(packet.getNodeID()); + replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); return replicationResponseMessage; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/276319d7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index b47dca9..ca99b23 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -318,6 +318,9 @@ public class ClusterController implements ActiveMQComponent { @Override public void handlePacket(Packet packet) { if (!isStarted()) { + if (channelHandler != null) { + channelHandler.handlePacket(packet); + } return; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/276319d7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java index 2205091..1ebb41a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExpireWhileLoadBalanceTest.java @@ -75,7 +75,7 @@ public class ExpireWhileLoadBalanceTest extends ClusterTestBase { for (int i = 0; i <= 2; i++) { createQueue(i, "queues.testaddress", "queue0", null, true); getServer(i).createQueue(expiryQueue, expiryQueue, null, true, false); - getServer(i).getAddressSettingsRepository().addMatch("queues.*", as); + getServer(i).getAddressSettingsRepository().addMatch("#", as); }
