Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x e17c59d12 -> 47d33f3a7


ARTEMIS-1989 Replication catch up leaking files

Related commit that broke this at 
https://github.com/hornetq/hornetq/commit/837694e70573069cf78d1911975bef95925b6f29

(cherry picked from commit 56be281aafdbfaea2059c0ed5bd1e065f7710124)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/be3fb960
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/be3fb960
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/be3fb960

Branch: refs/heads/2.6.x
Commit: be3fb960ed2d22243c7878725869f24a5e74de6f
Parents: e17c59d
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Tue Jul 24 21:54:18 2018 -0400
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Jul 25 10:51:04 2018 -0400

----------------------------------------------------------------------
 .../artemis/core/io/nio/NIOSequentialFile.java  |   2 +-
 .../core/io/nio/NIOSequentialFileFactory.java   |   2 +-
 .../core/paging/impl/PagingManagerImpl.java     |  13 +-
 .../core/paging/impl/PagingStoreFactoryNIO.java |  38 ++++-
 .../core/replication/ReplicationEndpoint.java   |   9 +-
 .../artemis/core/server/ActiveMQServer.java     |   2 +
 .../core/server/impl/ActiveMQServerImpl.java    |   3 +-
 ...SharedNothingReplicationFlowControlTest.java | 140 ++++++++++++++++++-
 8 files changed, 194 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be3fb960/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 891bd5c..55654b7 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -37,7 +37,7 @@ import 
org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.Env;
 
-public final class NIOSequentialFile extends AbstractSequentialFile {
+public class NIOSequentialFile extends AbstractSequentialFile {
 
    private FileChannel channel;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be3fb960/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
index b585b24..c142377 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
@@ -29,7 +29,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 
-public final class NIOSequentialFileFactory extends 
AbstractSequentialFileFactory {
+public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
 
    private static final int DEFAULT_CAPACITY_ALIGNMENT = Env.osPageSize();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be3fb960/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 44e8067..bca70cf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -62,7 +62,7 @@ public final class PagingManagerImpl implements PagingManager 
{
 
    private final HierarchicalRepository<AddressSettings> 
addressSettingsRepository;
 
-   private final PagingStoreFactory pagingStoreFactory;
+   private PagingStoreFactory pagingStoreFactory;
 
    private final AtomicLong globalSizeBytes = new AtomicLong(0);
 
@@ -84,6 +84,17 @@ public final class PagingManagerImpl implements 
PagingManager {
    // Constructors
    // 
--------------------------------------------------------------------------------------------------------------------
 
+
+   // for tests.. not part of the API
+   public void replacePageStoreFactory(PagingStoreFactory factory) {
+      this.pagingStoreFactory = factory;
+   }
+
+   // for tests.. not part of the API
+   public PagingStoreFactory getPagingStoreFactory() {
+      return pagingStoreFactory;
+   }
+
    public PagingManagerImpl(final PagingStoreFactory pagingSPI,
                             final HierarchicalRepository<AddressSettings> 
addressSettingsRepository,
                             final long maxSize) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be3fb960/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index b2e3d4f..aa71c0e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -63,7 +63,7 @@ public class PagingStoreFactoryNIO implements 
PagingStoreFactory {
 
    private final ExecutorFactory executorFactory;
 
-   protected final boolean syncNonTransactional;
+   private final boolean syncNonTransactional;
 
    private PagingManager pagingManager;
 
@@ -71,10 +71,38 @@ public class PagingStoreFactoryNIO implements 
PagingStoreFactory {
 
    private final long syncTimeout;
 
-   protected final StorageManager storageManager;
+   private final StorageManager storageManager;
 
    private final IOCriticalErrorListener critialErrorListener;
 
+   public File getDirectory() {
+      return directory;
+   }
+
+   public ExecutorFactory getExecutorFactory() {
+      return executorFactory;
+   }
+
+   public boolean isSyncNonTransactional() {
+      return syncNonTransactional;
+   }
+
+   public PagingManager getPagingManager() {
+      return pagingManager;
+   }
+
+   public long getSyncTimeout() {
+      return syncTimeout;
+   }
+
+   public StorageManager getStorageManager() {
+      return storageManager;
+   }
+
+   public IOCriticalErrorListener getCritialErrorListener() {
+      return critialErrorListener;
+   }
+
    public PagingStoreFactoryNIO(final StorageManager storageManager,
                                 final File directory,
                                 final long syncTimeout,
@@ -135,9 +163,7 @@ public class PagingStoreFactoryNIO implements 
PagingStoreFactory {
 
       factory.createDirs();
 
-      File fileWithID = new File(directory, guid +
-         File.separatorChar +
-         PagingStoreFactoryNIO.ADDRESS_FILE);
+      File fileWithID = new File(directory, guid + File.separatorChar + 
PagingStoreFactoryNIO.ADDRESS_FILE);
 
       try (BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(new FileOutputStream(fileWithID)))) {
          writer.write(address.toString());
@@ -197,7 +223,7 @@ public class PagingStoreFactoryNIO implements 
PagingStoreFactory {
       }
    }
 
-   private SequentialFileFactory newFileFactory(final String directoryName) {
+   protected SequentialFileFactory newFileFactory(final String directoryName) {
 
       return new NIOSequentialFileFactory(new File(directory, directoryName), 
false, critialErrorListener, 1);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be3fb960/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 6e45a8c..15d5311 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -43,8 +43,6 @@ import 
org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.impl.Page;
-import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync;
@@ -262,7 +260,8 @@ public final class ReplicationEndpoint implements 
ChannelHandler, ActiveMQCompon
             journalLoadInformation[jc.typeByte] = 
journalsHolder.get(jc).loadSyncOnly(JournalState.SYNCING);
          }
 
-         pageManager = new PagingManagerImpl(new 
PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 
config.getJournalBufferTimeout_NIO(), server.getScheduledPool(), 
server.getIOExecutorFactory(), config.isJournalSyncNonTransactional(), 
criticalErrorListener), server.getAddressSettingsRepository());
+
+         pageManager = server.createPagingManager();
 
          pageManager.start();
 
@@ -446,6 +445,10 @@ public final class ReplicationEndpoint implements 
ChannelHandler, ActiveMQCompon
       }
 
       if (data == null) {
+         // this means close file
+         if (channel1.isOpen()) {
+            channel1.close();
+         }
          return;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be3fb960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 78ebbb7..130ce22 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -128,6 +128,8 @@ public interface ActiveMQServer extends ServiceComponent {
 
    PagingManager getPagingManager();
 
+   PagingManager createPagingManager() throws Exception;
+
    ManagementService getManagementService();
 
    ActiveMQSecurityManager getSecurityManager();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be3fb960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 52f88cd..281be23 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2131,7 +2131,8 @@ public class ActiveMQServerImpl implements ActiveMQServer 
{
       this.queueFactory = factory;
    }
 
-   protected PagingManager createPagingManager() throws Exception {
+   @Override
+   public PagingManager createPagingManager() throws Exception {
       return new PagingManagerImpl(getPagingStoreFactory(), 
addressSettingsRepository, configuration.getGlobalMaxSize());
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be3fb960/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
index 981b355..5508c94 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java
@@ -18,17 +18,22 @@
 package org.apache.activemq.artemis.tests.integration.replication;
 
 import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
@@ -40,20 +45,34 @@ import 
org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
 import 
org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFile;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.junit.Wait;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.jboss.logging.Logger;
 import org.junit.Assert;
 import org.junit.Before;
@@ -65,7 +84,6 @@ public class SharedNothingReplicationFlowControlTest extends 
ActiveMQTestBase {
 
    ExecutorService sendMessageExecutor;
 
-
    @Before
    public void setupExecutor() {
       sendMessageExecutor = Executors.newCachedThreadPool();
@@ -98,8 +116,8 @@ public class SharedNothingReplicationFlowControlTest extends 
ActiveMQTestBase {
       ClientSession sess = csf.createSession();
       sess.createQueue("flowcontrol", RoutingType.ANYCAST, "flowcontrol", 
true);
       sess.close();
-      Executor sendMessageExecutor = Executors.newCachedThreadPool();
 
+Executor sendMessageExecutor = Executors.newCachedThreadPool();
       int i = 0;
       final int j = 100;
       final CountDownLatch allMessageSent = new CountDownLatch(j);
@@ -193,6 +211,124 @@ public class SharedNothingReplicationFlowControlTest 
extends ActiveMQTestBase {
       Assert.assertEquals("Backup did not replicated all journal", j, 
replicationCounter.get());
    }
 
+   @Test
+   public void testSendPages() throws Exception {
+      // start live
+      Configuration liveConfiguration = createLiveConfiguration();
+      ActiveMQServer liveServer = 
addServer(ActiveMQServers.newActiveMQServer(liveConfiguration));
+      liveServer.start();
+
+      Wait.waitFor(() -> liveServer.isStarted());
+
+      ServerLocator locator = 
ServerLocatorImpl.newLocator("tcp://localhost:61616");
+      locator.setCallTimeout(60_000L);
+      locator.setConnectionTTL(60_000L);
+
+      final ClientSessionFactory csf = locator.createSessionFactory();
+      ClientSession sess = csf.createSession();
+      sess.createQueue("flowcontrol", RoutingType.ANYCAST, "flowcontrol", 
true);
+
+      PagingStore store = 
liveServer.getPagingManager().getPageStore(SimpleString.toSimpleString("flowcontrol"));
+      store.startPaging();
+
+      ClientProducer prod = sess.createProducer("flowcontrol");
+      for (int i = 0; i < 100; i++) {
+         prod.send(sess.createMessage(true));
+
+         if (i % 10 == 0) {
+            sess.commit();
+            store.forceAnotherPage();
+         }
+      }
+
+      sess.close();
+
+      openCount.set(0);
+      closeCount.set(0);
+      // start backup
+      Configuration backupConfiguration = 
createBackupConfiguration().setNetworkCheckURLList(null);
+
+      ActiveMQServer backupServer = new 
ActiveMQServerImpl(backupConfiguration, 
ManagementFactory.getPlatformMBeanServer(), new 
ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new 
SecurityConfiguration())) {
+         @Override
+         public PagingManager createPagingManager() throws Exception {
+            PagingManagerImpl manager = (PagingManagerImpl) 
super.createPagingManager();
+            PagingStoreFactoryNIO originalPageStore = (PagingStoreFactoryNIO) 
manager.getPagingStoreFactory();
+            manager.replacePageStoreFactory(new 
PageStoreFactoryTestable(originalPageStore));
+            return manager;
+         }
+      };
+
+      addServer(backupServer).start();
+
+      Wait.waitFor(() -> backupServer.isStarted());
+
+      Wait.waitFor(backupServer::isReplicaSync, 30000);
+
+      PageStoreFactoryTestable testablePageStoreFactory = 
(PageStoreFactoryTestable) ((PagingManagerImpl) 
backupServer.getPagingManager()).getPagingStoreFactory();
+
+      Assert.assertEquals(openCount.get(), closeCount.get());
+   }
+
+   static AtomicInteger openCount = new AtomicInteger(0);
+   static AtomicInteger closeCount = new AtomicInteger(0);
+
+   private static class PageStoreFactoryTestable extends PagingStoreFactoryNIO 
{
+
+      PageStoreFactoryTestable(StorageManager storageManager,
+                                      File directory,
+                                      long syncTimeout,
+                                      ScheduledExecutorService 
scheduledExecutor,
+                                      ExecutorFactory executorFactory,
+                                      boolean syncNonTransactional,
+                                      IOCriticalErrorListener 
critialErrorListener) {
+         super(storageManager, directory, syncTimeout, scheduledExecutor, 
executorFactory, syncNonTransactional, critialErrorListener);
+      }
+
+      PageStoreFactoryTestable(PagingStoreFactoryNIO other) {
+         this(other.getStorageManager(), other.getDirectory(), 
other.getSyncTimeout(), other.getScheduledExecutor(), 
other.getExecutorFactory(), other.isSyncNonTransactional(), 
other.getCritialErrorListener());
+      }
+
+      @Override
+      protected SequentialFileFactory newFileFactory(String directoryName) {
+         return new TestableNIOFactory(new File(getDirectory(), 
directoryName), false, getCritialErrorListener(), 1);
+      }
+   }
+
+   public static class TestableNIOFactory extends NIOSequentialFileFactory {
+
+      public TestableNIOFactory(File journalDir, boolean buffered, 
IOCriticalErrorListener listener, int maxIO) {
+         super(journalDir, buffered, listener, maxIO);
+      }
+
+      @Override
+      public SequentialFile createSequentialFile(String fileName) {
+         return new TestableSequentialFile(this, journalDir, fileName, maxIO, 
writeExecutor);
+      }
+   }
+
+   public static class TestableSequentialFile extends NIOSequentialFile {
+
+      public TestableSequentialFile(SequentialFileFactory factory,
+                                    File directory,
+                                    String file,
+                                    int maxIO,
+                                    Executor writerExecutor) {
+         super(factory, directory, file, maxIO, writerExecutor);
+      }
+
+      @Override
+      public void open(int maxIO, boolean useExecutor) throws IOException {
+         super.open(maxIO, useExecutor);
+         openCount.incrementAndGet();
+      }
+
+      @Override
+      public synchronized void close() throws IOException, 
InterruptedException, ActiveMQException {
+         super.close();
+         closeCount.incrementAndGet();
+      }
+   }
+
    // Set a small call timeout and write buffer high water mark value to 
trigger replication flow control
    private Configuration createLiveConfiguration() throws Exception {
       Configuration conf = new ConfigurationImpl();

Reply via email to