ARTEMIS-612 Improving Failback's max replication The server will always restart now, with older files being removed. The system will now move current data into ./oldreplica.#, and remove old ones. All the logic for moving these files is encapsulated at FileMoveManager.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/81541200 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/81541200 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/81541200 Branch: refs/heads/master Commit: 8154120027ba2b758ae3632204831247288bcc99 Parents: 62f414f Author: Clebert Suconic <[email protected]> Authored: Mon Jun 27 14:52:59 2016 -0400 Committer: jbertram <[email protected]> Committed: Tue Jul 5 16:51:23 2016 -0500 ---------------------------------------------------------------------- .../core/paging/impl/PagingStoreFactoryNIO.java | 7 +- .../artemis/core/server/ActiveMQServer.java | 6 + .../core/server/ActiveMQServerLogger.java | 7 +- .../core/server/impl/ActiveMQServerImpl.java | 127 +++---- .../core/server/impl/FileMoveManager.java | 218 ++++++++++++ .../impl/SharedNothingBackupActivation.java | 4 +- .../impl/SharedNothingLiveActivation.java | 20 +- .../core/server/impl/FileMoveManagerTest.java | 346 +++++++++++++++++++ .../artemis/tests/util/ActiveMQTestBase.java | 13 +- tests/config/logging.properties.trace | 4 +- .../cluster/failover/BackupSyncJournalTest.java | 24 +- .../cluster/failover/FailoverTest.java | 74 ++-- .../failover/LiveToLiveFailoverTest.java | 2 +- .../failover/ReplicatedFailoverTest.java | 25 +- 14 files changed, 746 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java index 00da382..d81591c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java @@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.impl.FileMoveManager; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.ExecutorFactory; @@ -147,7 +148,11 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { final File addressFile = new File(file, PagingStoreFactoryNIO.ADDRESS_FILE); if (!addressFile.exists()) { - ActiveMQServerLogger.LOGGER.pageStoreFactoryNoIdFile(file.toString(), PagingStoreFactoryNIO.ADDRESS_FILE); + + // This means this folder is from a replication copy, nothing to worry about it, we just skip it + if (!file.getName().contains(FileMoveManager.PREFIX)) { + ActiveMQServerLogger.LOGGER.pageStoreFactoryNoIdFile(file.toString(), PagingStoreFactoryNIO.ADDRESS_FILE); + } continue; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index e416205..af2f7cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -94,6 +94,12 @@ public interface ActiveMQServer extends ActiveMQComponent { NodeManager getNodeManager(); + /** it will release hold a lock for the activation. */ + void unlockActivation(); + + /** it will hold a lock for the activation. This will prevent the activation from happening. */ + void lockActivation(); + /** * Returns the resource to manage this ActiveMQ Artemis server. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index ba08b7b..b9ac8a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -998,7 +998,7 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 222161, value = "Group Handler timed-out waiting for sendCondition", format = Message.Format.MESSAGE_FORMAT) void groupHandlerSendTimeout(); - @LogMessage(level = Logger.Level.WARN) + @LogMessage(level = Logger.Level.INFO) @Message(id = 222162, value = "Moving data directory {0} to {1}", format = Message.Format.MESSAGE_FORMAT) void backupMovingDataAway(String oldPath, String newPath); @@ -1219,6 +1219,11 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void sslHandshakeFailed(String clientAddress, String cause); + @LogMessage(level = Logger.Level.INFO) + @Message(id = 222209, value = "There were too many old replicated folders upon startup, removing {0}", + format = Message.Format.MESSAGE_FORMAT) + void removingBackupData(String path); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index d2d7783..8acdc11 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.server.impl; import javax.management.MBeanServer; import javax.security.cert.X509Certificate; import java.io.File; -import java.io.FilenameFilter; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.management.ManagementFactory; @@ -39,6 +39,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -253,6 +254,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { private final Map<String, ServerSession> sessions = new ConcurrentHashMap<>(); + private final Semaphore activationLock = new Semaphore(1); /** * This class here has the same principle of CountDownLatch but you can reuse the counters. * It's based on the same super classes of {@code CountDownLatch} @@ -436,7 +438,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO); } - backupActivationThread = new Thread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this)); + if (logger.isTraceEnabled()) { + logger.trace("starting backupActivation"); + } + backupActivationThread = new ActivationThread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this)); backupActivationThread.start(); } else { @@ -453,6 +458,21 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override + public void unlockActivation() { + activationLock.release(); + } + + @Override + public void lockActivation() { + try { + activationLock.acquire(); + } + catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + + @Override protected final void finalize() throws Throwable { if (state != SERVER_STATE.STOPPED) { ActiveMQServerLogger.LOGGER.serverFinalisedWIthoutBeingSTopped(); @@ -510,6 +530,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public void setHAPolicy(HAPolicy haPolicy) { + if (logger.isTraceEnabled()) { + logger.tracef("XXX @@@ Setting %s, isBackup=%s at %s", haPolicy, haPolicy.isBackup(), this); + } this.haPolicy = haPolicy; } @@ -707,6 +730,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { * @param criticalIOError whether we have encountered an IO error with the journal etc */ void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting) { + synchronized (this) { if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) { return; @@ -2202,7 +2226,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { /** * Check if journal directory exists or create it (if configured to do so) */ - void checkJournalDirectory() { + public void checkJournalDirectory() { File journalDir = configuration.getJournalLocation(); if (!journalDir.exists() && configuration.isPersistenceEnabled()) { @@ -2269,86 +2293,18 @@ public class ActiveMQServerImpl implements ActiveMQServer { return scaledDownNodeIDs.contains(scaledDownNodeId); } - int countNumberOfCopiedJournals() { - //will use the main journal to check for how many backups have been kept - File journalDir = new File(configuration.getJournalDirectory()); - final String fileName = journalDir.getName(); - int numberOfbackupsSaved = 0; - //fine if it doesn't exist, we aren't using file based persistence so it's no issue - if (journalDir.exists()) { - File parentFile = new File(journalDir.getParent()); - String[] backupJournals = parentFile.list(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.startsWith(fileName) && !name.matches(fileName); - } - }); - numberOfbackupsSaved = backupJournals != null ? backupJournals.length : 0; - } - return numberOfbackupsSaved; - } - /** * Move data away before starting data synchronization for fail-back. * <p> * Use case is a server, upon restarting, finding a former backup running in its place. It will * move any older data away and log a warning about it. */ - void moveServerData() { + void moveServerData(int maxSavedReplicated) throws IOException { File[] dataDirs = new File[]{configuration.getBindingsLocation(), configuration.getJournalLocation(), configuration.getPagingLocation(), configuration.getLargeMessagesLocation()}; - boolean allEmpty = true; - int lowestSuffixForMovedData = 1; - boolean redo = true; - - while (redo) { - redo = false; - for (File fDir : dataDirs) { - if (fDir.exists()) { - if (!fDir.isDirectory()) { - throw ActiveMQMessageBundle.BUNDLE.journalDirIsFile(fDir); - } - - if (fDir.list().length > 0) - allEmpty = false; - } - - String sanitizedPath = fDir.getPath(); - while (new File(sanitizedPath + lowestSuffixForMovedData).exists()) { - lowestSuffixForMovedData++; - redo = true; - } - } - } - if (allEmpty) - return; - - for (File dir : dataDirs) { - File newPath = new File(dir.getPath() + lowestSuffixForMovedData); - if (dir.exists()) { - if (!dir.renameTo(newPath)) { - throw ActiveMQMessageBundle.BUNDLE.couldNotMoveJournal(dir); - } - - ActiveMQServerLogger.LOGGER.backupMovingDataAway(dir.getAbsolutePath(), newPath.getPath()); - } - /* - * sometimes OS's can hold on to file handles for a while so we need to check this actually qorks and then wait - * a while and try again if it doesn't - * */ - - int count = 0; - while (!dir.exists() && !dir.mkdir()) { - try { - Thread.sleep(1000); - } - catch (InterruptedException e) { - } - count++; - if (count == 5) { - throw ActiveMQMessageBundle.BUNDLE.cannotCreateDir(dir.getPath()); - } - } + for (File data : dataDirs) { + FileMoveManager moveManager = new FileMoveManager(data, maxSavedReplicated); + moveManager.doMove(); } } @@ -2371,4 +2327,25 @@ public class ActiveMQServerImpl implements ActiveMQServer { return new Date().getTime() - startDate.getTime(); } + + + private final class ActivationThread extends Thread { + final Runnable runnable; + + ActivationThread(Runnable runnable, String name) { + super(name); + this.runnable = runnable; + } + + public void run() { + lockActivation(); + try { + runnable.run(); + } + finally { + unlockActivation(); + } + } + + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java new file mode 100644 index 0000000..efe1bb2 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; + +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.jboss.logging.Logger; + +/** + * Used to move files away. + * Each time a backup starts its formeter data will be moved to a backup folder called bkp.1, bkp.2, ... etc + * We may control the maximum number of folders so we remove old ones. + */ +public class FileMoveManager { + private static final Logger logger = Logger.getLogger(FileMoveManager.class); + + private final File folder; + private int maxFolders; + public static final String PREFIX = "oldreplica."; + + private static final FilenameFilter isPrefix = new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + boolean prefixed = name.contains(PREFIX); + + if (prefixed) { + try { + Integer.parseInt(name.substring(PREFIX.length())); + } + catch (NumberFormatException e) { + // This function is not really used a lot + // so I don't really mind about performance here + // this is good enough for what we need + prefixed = false; + } + } + + return prefixed; + } + }; + + private static final FilenameFilter notPrefix = new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return !isPrefix.accept(dir, name); + } + }; + + + public FileMoveManager(File folder) { + this(folder, -1); + } + + public FileMoveManager(File folder, int maxFolders) { + this.folder = folder; + this.maxFolders = maxFolders; + } + + public int getMaxFolders() { + return maxFolders; + } + + public FileMoveManager setMaxFolders(int maxFolders) { + this.maxFolders = maxFolders; + return this; + } + + public void doMove() throws IOException { + String[] files = getFiles(); + + if (files == null || files.length == 0) { + // if no files, nothing to be done, no backup, no deletes... nothing! + return; + } + + // Since we will create one folder, we are already taking that one into consideration + internalCheckOldFolders(1); + + int whereToMove = getMaxID() + 1; + + File folderTo = getFolder(whereToMove); + folderTo.mkdirs(); + + ActiveMQServerLogger.LOGGER.backupMovingDataAway(folder.getPath(), folderTo.getPath()); + + for (String fileMove : files) { + File fileFrom = new File(folder, fileMove); + File fileTo = new File(folderTo, fileMove); + logger.tracef("doMove:: moving %s as %s", fileFrom, fileTo); + Files.move(fileFrom.toPath(), fileTo.toPath()); + } + + } + + public void checkOldFolders() { + internalCheckOldFolders(0); + } + + private void internalCheckOldFolders(int creating) { + if (maxFolders > 0) { + int folders = getNumberOfFolders(); + + // We are counting the next one to be created + int foldersToDelete = folders + creating - maxFolders; + + if (foldersToDelete > 0) { + logger.tracef("There are %d folders to delete", foldersToDelete); + int[] ids = getIDlist(); + for (int i = 0; i < foldersToDelete; i++) { + File file = getFolder(ids[i]); + ActiveMQServerLogger.LOGGER.removingBackupData(file.getPath()); + deleteTree(file); + } + } + } + } + + /** + * It will return non backup folders + */ + public String[] getFiles() { + return folder.list(notPrefix); + } + + + public int getNumberOfFolders() { + return getFolders().length; + } + + public String[] getFolders() { + String[] list = folder.list(isPrefix); + + if (list == null) { + list = new String[0]; + } + + + return list; + } + + + public int getMinID() { + int[] list = getIDlist(); + + if (list.length == 0) { + return 0; + } + + return list[0]; + } + + public int getMaxID() { + int[] list = getIDlist(); + + if (list.length == 0) { + return 0; + } + + return list[list.length - 1]; + } + + + public int[] getIDlist() { + String[] list = getFolders(); + int[] ids = new int[list.length]; + for (int i = 0; i < ids.length; i++) { + ids[i] = getID(list[i]); + } + + Arrays.sort(ids); + + return ids; + } + + public int getID(String folderName) { + return Integer.parseInt(folderName.substring(PREFIX.length())); + } + + + public File getFolder(int id) { + return new File(folder, PREFIX + id); + } + + + private void deleteTree(File file) { + File[] files = file.listFiles(); + + if (files != null) { + for (File fileDelete : files) { + deleteTree(fileDelete); + } + } + + file.delete(); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index e2adc1f..d279864 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -104,7 +104,7 @@ public final class SharedNothingBackupActivation extends Activation { } // move all data away: activeMQServer.getNodeManager().stop(); - activeMQServer.moveServerData(); + activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize()); activeMQServer.getNodeManager().start(); synchronized (this) { if (closed) @@ -311,7 +311,7 @@ public final class SharedNothingBackupActivation extends Activation { } if (logger.isTraceEnabled()) { - logger.trace("setReplicaPolicy::" + replicaPolicy); + logger.trace("@@@ setReplicaPolicy::" + replicaPolicy); } replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 3e0d812..6b222fb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -90,11 +90,16 @@ public class SharedNothingLiveActivation extends LiveActivation { try { if (replicatedPolicy.isCheckForLiveServer() && isNodeIdUsed()) { //set for when we failback + if (logger.isTraceEnabled()) { + logger.tracef("@@@ setting up replicatedPolicy.getReplicaPolicy for back start, replicaPolicy::%s, isBackup=%s, server=%s", replicatedPolicy.getReplicaPolicy(), replicatedPolicy.isBackup(), activeMQServer); + } replicatedPolicy.getReplicaPolicy().setReplicatedPolicy(replicatedPolicy); activeMQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy()); return; } + logger.trace("@@@ did not do it now"); + activeMQServer.initialisePart1(false); activeMQServer.initialisePart2(false); @@ -175,16 +180,11 @@ public class SharedNothingLiveActivation extends LiveActivation { clusterConnection.addClusterTopologyListener(listener1); if (listener1.waitForBackup()) { //if we have to many backups kept or are not configured to restart just stop, otherwise restart as a backup - if (!replicatedPolicy.getReplicaPolicy().isRestartBackup() && activeMQServer.countNumberOfCopiedJournals() >= replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() && replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() >= 0) { - activeMQServer.stop(true); - ActiveMQServerLogger.LOGGER.stopReplicatedBackupAfterFailback(); - } - else { - activeMQServer.stop(true); - ActiveMQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback(); - activeMQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy()); - activeMQServer.start(); - } + activeMQServer.stop(true); + ActiveMQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback(); +// activeMQServer.moveServerData(replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize()); + activeMQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy()); + activeMQServer.start(); } else { ActiveMQServerLogger.LOGGER.failbackMissedBackupAnnouncement(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java new file mode 100644 index 0000000..0935c38 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java @@ -0,0 +1,346 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; +import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; +import org.apache.activemq.artemis.utils.OrderedExecutorFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class FileMoveManagerTest { + + @Rule + public TemporaryFolder temporaryFolder; + + @Rule + public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule(); + + private File dataLocation; + private FileMoveManager manager; + + + @Before + public void setUp() { + dataLocation = new File(temporaryFolder.getRoot(), "data"); + dataLocation.mkdirs(); + manager = new FileMoveManager(dataLocation, 10); + } + + + public FileMoveManagerTest() { + File parent = new File("./target/tmp"); + parent.mkdirs(); + temporaryFolder = new TemporaryFolder(parent); + } + + @Test + public void testBackupFiles() { + int[] originalFiles = new int[12]; + int count = 0; + + // It will fake folders creation + for (int i = 0; i < 12; i++) { + originalFiles[count++] = i; + File bkp = new File(dataLocation, FileMoveManager.PREFIX + i); + bkp.mkdirs(); + } + + Assert.assertEquals(12, manager.getFolders().length); + Assert.assertEquals(12, manager.getNumberOfFolders()); + + + assertIDs(originalFiles, manager.getIDlist()); + } + + @Test + public void testMinMax() { + int[] originalFiles = new int[12]; + int count = 0; + + // It will fake folders creation + for (int i = 0; i < 5; i++) { + originalFiles[count++] = i; + File bkp = new File(dataLocation, FileMoveManager.PREFIX + i); + bkp.mkdirs(); + } + + // simulates a hole where someone removed a folder by hand + + // It will fake folders creation + for (int i = 7; i < 14; i++) { + originalFiles[count++] = i; + File bkp = new File(dataLocation, FileMoveManager.PREFIX + i); + bkp.mkdirs(); + } + + Assert.assertEquals(12, manager.getFolders().length); + Assert.assertEquals(12, manager.getNumberOfFolders()); + + int[] ids = manager.getIDlist(); + + assertIDs(originalFiles, ids); + + Assert.assertEquals(0, manager.getMinID()); + Assert.assertEquals(13, manager.getMaxID()); + + manager.setMaxFolders(3).checkOldFolders(); + + Assert.assertEquals(3, manager.getNumberOfFolders()); + Assert.assertEquals(13, manager.getMaxID()); + Assert.assertEquals(11, manager.getMinID()); + + } + + @Test + public void testGarbageCreated() { + // I'm pretending an admin created a folder here + File garbage = new File(dataLocation, "bkp.zzz"); + garbage.mkdirs(); + + testMinMax(); + + resetTmp(); + // the admin renamed a folder maybe + garbage = new File(dataLocation, "bkp.001.old"); + garbage.mkdirs(); + + resetTmp(); + + // the admin renamed a folder maybe + garbage = new File(dataLocation, "bkp.1.5"); + garbage.mkdirs(); + + testMinMax(); + } + + + @Test + public void testNoFolders() { + Assert.assertEquals(0, manager.getFolders().length); + Assert.assertEquals(0, manager.getNumberOfFolders()); + + Assert.assertTrue(dataLocation.delete()); + + Assert.assertEquals(0, manager.getFolders().length); + Assert.assertEquals(0, manager.getNumberOfFolders()); + } + + + @Test + public void testNoFiles() throws Exception { + // nothing to be moved, so why to do a backup + manager.doMove(); + + Assert.assertEquals(0, manager.getNumberOfFolders()); + } + + @Test + public void testMoveFiles() throws Exception { + manager.setMaxFolders(3); + + for (int bkp = 1; bkp <= 10; bkp++) { + for (int i = 0; i < 100; i++) { + createFile(dataLocation, i); + } + + manager.doMove(); + + // We will always have maximum of 3 folders + Assert.assertEquals(Math.min(bkp, manager.getMaxFolders()), manager.getNumberOfFolders()); + + File bkpFolder = manager.getFolder(bkp); + + FileMoveManager bkp1Manager = new FileMoveManager(bkpFolder, 10); + String[] filesAfterMove = bkp1Manager.getFiles(); + + for (String file : filesAfterMove) { + checkFile(bkpFolder, file); + } + } + + Assert.assertEquals(manager.getMaxFolders(), manager.getNumberOfFolders()); + + manager.setMaxFolders(0).checkOldFolders(); + + Assert.assertEquals(3, manager.getNumberOfFolders()); + + manager.setMaxFolders(1).checkOldFolders(); + Assert.assertEquals(1, manager.getNumberOfFolders()); + + + Assert.assertEquals(10, manager.getMaxID()); + Assert.assertEquals(10, manager.getMinID()); + } + + + @Test + public void testMoveFolders() throws Exception { + manager.setMaxFolders(3); + + int NUMBER_OF_FOLDERS = 10; + int FILES_PER_FOLDER = 10; + + for (int bkp = 1; bkp <= 10; bkp++) { + for (int f = 0; f < NUMBER_OF_FOLDERS; f++) { + File folderF = new File(dataLocation, "folder" + f); + folderF.mkdirs(); + + // FILES_PER_FOLDER + f, I'm just creating more files as f grows. + // this is just to make each folder unique somehow + for (int i = 0; i < FILES_PER_FOLDER + f; i++) { + createFile(folderF, i); + } + } + + manager.doMove(); + + // We will always have maximum of 3 folders + Assert.assertEquals(Math.min(bkp, manager.getMaxFolders()), manager.getNumberOfFolders()); + + File bkpFolder = manager.getFolder(bkp); + + for (int f = 0; f < NUMBER_OF_FOLDERS; f++) { + File fileTmp = new File(bkpFolder, "folder" + f); + + String[] filesOnFolder = fileTmp.list(); + + Assert.assertEquals(FILES_PER_FOLDER + f, filesOnFolder.length); + + for (String file : filesOnFolder) { + checkFile(fileTmp, file); + } + } + + } + + Assert.assertEquals(manager.getMaxFolders(), manager.getNumberOfFolders()); + + manager.setMaxFolders(0).checkOldFolders(); + + Assert.assertEquals(3, manager.getNumberOfFolders()); + + manager.setMaxFolders(1).checkOldFolders(); + Assert.assertEquals(1, manager.getNumberOfFolders()); + + + Assert.assertEquals(10, manager.getMaxID()); + Assert.assertEquals(10, manager.getMinID()); + } + + @Test + public void testMoveOverPaging() throws Exception { + AssertionLoggerHandler.startCapture(); + + ExecutorService threadPool = Executors.newCachedThreadPool(); + try { + manager.setMaxFolders(3); + for (int i = 1; i <= 10; i++) { + HierarchicalRepository<AddressSettings> addressSettings = new HierarchicalObjectRepository<>(); + AddressSettings settings = new AddressSettings(); + settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + addressSettings.setDefault(settings); + + final StorageManager storageManager = new NullStorageManager(); + + PagingStoreFactoryNIO storeFactory = + new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, + new OrderedExecutorFactory(threadPool), true, null); + + PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings); + + managerImpl.start(); + + PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test")); + + store.startPaging(); + + store.stop(); + + managerImpl.stop(); + + manager.doMove(); + + Assert.assertEquals(Math.min(i, manager.getMaxFolders()), manager.getNumberOfFolders()); + } + + Assert.assertFalse("The loggers are complaining about address.txt", AssertionLoggerHandler.findText("address.txt")); + } + finally { + AssertionLoggerHandler.stopCapture(); + threadPool.shutdown(); + } + + + } + + + private void assertIDs(int[] originalFiles, int[] ids) { + Assert.assertEquals(originalFiles.length, ids.length); + for (int i = 0; i < ids.length; i++) { + Assert.assertEquals(originalFiles[i], ids[i]); + } + } + + private void resetTmp() { + temporaryFolder.delete(); + temporaryFolder.getRoot().mkdirs(); + Assert.assertEquals(0, manager.getNumberOfFolders()); + } + + private void createFile(File folder, int i) throws FileNotFoundException { + File dataFile = new File(folder, i + ".jrn"); + PrintWriter outData = new PrintWriter(new FileOutputStream(dataFile)); + outData.print(i); + outData.close(); + } + + private void checkFile(File bkpFolder, String file) throws IOException { + File fileRead = new File(bkpFolder, file); + InputStreamReader stream = new InputStreamReader(new FileInputStream(fileRead)); + BufferedReader reader = new BufferedReader(stream); + String valueRead = reader.readLine(); + int id = Integer.parseInt(file.substring(0, file.indexOf('.'))); + Assert.assertEquals("content of the file wasn't the expected", id, Integer.parseInt(valueRead)); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 6a9f729..f9a9535 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -1297,6 +1297,10 @@ public abstract class ActiveMQTestBase extends Assert { } protected void waitForServerToStart(ActiveMQServer server) throws InterruptedException { + waitForServerToStart(server, true); + } + + protected void waitForServerToStart(ActiveMQServer server, boolean activation) throws InterruptedException { if (server == null) return; final long wait = 5000; @@ -1310,9 +1314,12 @@ public abstract class ActiveMQTestBase extends Assert { fail("server didn't start: " + server); } - if (!server.getHAPolicy().isBackup()) { - if (!server.waitForActivation(wait, TimeUnit.MILLISECONDS)) - fail("Server didn't initialize: " + server); + + if (activation) { + if (!server.getHAPolicy().isBackup()) { + if (!server.waitForActivation(wait, TimeUnit.MILLISECONDS)) + fail("Server didn't initialize: " + server); + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/tests/config/logging.properties.trace ---------------------------------------------------------------------- diff --git a/tests/config/logging.properties.trace b/tests/config/logging.properties.trace index aa23850..cd6e364 100644 --- a/tests/config/logging.properties.trace +++ b/tests/config/logging.properties.trace @@ -51,7 +51,7 @@ handler.TEST.formatter=PATTERN # Formatter pattern configuration formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter formatter.PATTERN.properties=pattern -formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n +#formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n # Alternate format useful when debugging -#formatter.PATTERN.pattern=*** [%t] ***\n%d{HH:mm:ss,SSS} %-5p [%c] %s%E%n\n +formatter.PATTERN.pattern=*** [%t] ***\n%d{HH:mm:ss,SSS} %-5p [%c] %s%E%n\n http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java index fa520c9..c32ebc1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java @@ -41,15 +41,20 @@ import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.FileMoveManager; import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay; import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; import org.apache.activemq.artemis.utils.UUID; +import org.jboss.logging.Logger; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; public class BackupSyncJournalTest extends FailoverTestBase { + private static final Logger logger = Logger.getLogger(BackupSyncJournalTest.class); + protected static final int BACKUP_WAIT_TIME = 60; private ServerLocatorInternal locator; protected ClientSessionFactoryInternal sessionFactory; @@ -283,17 +288,28 @@ public class BackupSyncJournalTest extends FailoverTestBase { sendMessages(session, producer, 2 * n_msgs); assertFalse("must NOT be a backup", liveServer.getServer().getHAPolicy().isBackup()); adaptLiveConfigForReplicatedFailBack(liveServer); - liveServer.start(); + FileMoveManager liveMoveManager = new FileMoveManager(liveServer.getServer().getConfiguration().getJournalLocation(), -1); + liveServer.getServer().lockActivation(); + try { + liveServer.start(); + assertTrue("must have become a backup", liveServer.getServer().getHAPolicy().isBackup()); + Assert.assertEquals(0, liveMoveManager.getNumberOfFolders()); + } + finally { + liveServer.getServer().unlockActivation(); + } waitForServerToStart(liveServer.getServer()); - assertTrue("must have become a backup", liveServer.getServer().getHAPolicy().isBackup()); + liveServer.getServer().waitForActivation(10, TimeUnit.SECONDS); + Assert.assertEquals(1, liveMoveManager.getNumberOfFolders()); + assertTrue("must be active now", !liveServer.getServer().getHAPolicy().isBackup()); assertTrue("Fail-back must initialize live!", liveServer.getServer().waitForActivation(15, TimeUnit.SECONDS)); assertFalse("must be LIVE!", liveServer.getServer().getHAPolicy().isBackup()); int i = 0; - while (backupServer.isStarted() && i++ < 100) { + while (!backupServer.isStarted() && i++ < 100) { Thread.sleep(100); } - assertFalse("Backup should stop!", backupServer.getServer().isStarted()); + assertTrue(backupServer.getServer().isStarted()); assertTrue(liveServer.getServer().isStarted()); receiveMsgsInRange(0, 2 * n_msgs); assertNoMoreMessages(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index ea9cb8e..f915a31 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -44,8 +44,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; -import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy; @@ -53,9 +51,12 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy; +import org.apache.activemq.artemis.core.server.impl.FileMoveManager; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener; import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; import org.apache.activemq.artemis.utils.RandomUtil; @@ -518,10 +519,12 @@ public class FailoverTest extends FailoverTestBase { boolean doFailBack = true; HAPolicy haPolicy = backupServer.getServer().getHAPolicy(); if (haPolicy instanceof ReplicaPolicy) { - ((ReplicaPolicy) haPolicy).setMaxSavedReplicatedJournalsSize(0); + ((ReplicaPolicy) haPolicy).setMaxSavedReplicatedJournalsSize(1); } - simpleReplication(doFailBack); + simpleFailover(haPolicy instanceof ReplicaPolicy, doFailBack); + tearDown(); + setUp(); } @Test @@ -571,9 +574,10 @@ public class FailoverTest extends FailoverTestBase { } @Test - public void testSimpleReplication() throws Exception { - boolean doFailBack = false; - simpleReplication(doFailBack); + public void testSimpleFailover() throws Exception { + HAPolicy haPolicy = backupServer.getServer().getHAPolicy(); + + simpleFailover(haPolicy instanceof ReplicaPolicy, false); } @Test @@ -628,7 +632,7 @@ public class FailoverTest extends FailoverTestBase { * @param doFailBack * @throws Exception */ - private void simpleReplication(boolean doFailBack) throws Exception { + private void simpleFailover(boolean isReplicated, boolean doFailBack) throws Exception { locator.setFailoverOnInitialConnection(true); createSessionFactory(); ClientSession session = createSessionAndQueue(); @@ -660,10 +664,16 @@ public class FailoverTest extends FailoverTestBase { liveServer.start(); Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS)); int i = 0; - while (backupServer.isStarted() && i++ < 100) { + while (!backupServer.isStarted() && i++ < 100) { Thread.sleep(100); } - Assert.assertFalse("Backup should stop!", backupServer.isStarted()); + liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS); + Assert.assertTrue(backupServer.isStarted()); + + if (isReplicated) { + FileMoveManager moveManager = new FileMoveManager(backupServer.getServer().getConfiguration().getJournalLocation(), 0); + Assert.assertEquals(1, moveManager.getNumberOfFolders()); + } } else { backupServer.stop(); @@ -886,35 +896,49 @@ public class FailoverTest extends FailoverTestBase { @Test public void testTransactedMessagesNotSentSoNoRollback() throws Exception { - createSessionFactory(); + try { + createSessionFactory(); - ClientSession session = createSessionAndQueue(); + ClientSession session = createSessionAndQueue(); - ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS); + ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS); - sendMessagesSomeDurable(session, producer); + sendMessagesSomeDurable(session, producer); - session.commit(); + session.commit(); - crash(session); + crash(session); - // committing again should work since didn't send anything since last commit + // committing again should work since didn't send anything since last commit - Assert.assertFalse(session.isRollbackOnly()); + Assert.assertFalse(session.isRollbackOnly()); - session.commit(); + session.commit(); - ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS); + ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS); - session.start(); + session.start(); - receiveDurableMessages(consumer); + receiveDurableMessages(consumer); - Assert.assertNull(consumer.receiveImmediate()); + Assert.assertNull(consumer.receiveImmediate()); - session.commit(); + session.commit(); - session.close(); + session.close(); + } + finally { + try { + liveServer.getServer().stop(); + } + catch (Throwable ignored) { + } + try { + backupServer.getServer().stop(); + } + catch (Throwable ignored) { + } + } } @Test http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java index 68f65a4..66c48b5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java @@ -320,7 +320,7 @@ public class LiveToLiveFailoverTest extends FailoverTest { } @Override - public void testSimpleReplication() throws Exception { + public void testSimpleFailover() throws Exception { } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java index f03326e..8dcf905 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.failover; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; @@ -98,22 +100,31 @@ public class ReplicatedFailoverTest extends FailoverTest { liveServer.start(); - waitForRemoteBackupSynchronization(liveServer.getServer()); - waitForServerToStart(liveServer.getServer()); - //this will give the backup time to stop fully - waitForServerToStop(backupServer.getServer()); + backupServer.getServer().waitForActivation(5, TimeUnit.SECONDS); + + waitForRemoteBackupSynchronization(liveServer.getServer()); + + waitForServerToStart(backupServer.getServer()); - assertFalse(backupServer.getServer().isStarted()); + assertTrue(backupServer.getServer().isStarted()); - //the server wouldnt have reset to backup - assertFalse(backupServer.getServer().getHAPolicy().isBackup()); } finally { if (sf != null) { sf.close(); } + try { + liveServer.getServer().stop(); + } + catch (Throwable ignored) { + } + try { + backupServer.getServer().stop(); + } + catch (Throwable ignored) { + } } }
