ARTEMIS-2215 largemessage have been consumed but not deleted from the disk 
during backup and live sync


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0a47e1bc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0a47e1bc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0a47e1bc

Branch: refs/heads/master
Commit: 0a47e1bc6fee2474e45acd8b8a7ca366e4e724bf
Parents: 02fc3c5
Author: yb <[email protected]>
Authored: Sat Dec 29 16:09:48 2018 +0800
Committer: Clebert Suconic <[email protected]>
Committed: Wed Jan 9 17:25:02 2019 -0500

----------------------------------------------------------------------
 .../journal/AbstractJournalStorageManager.java  |   3 +-
 .../impl/journal/JournalStorageManager.java     |  23 +-
 .../failover/LiveCrashOnBackupSyncTest.java     | 275 +++++++++++++++++++
 3 files changed, 286 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0a47e1bc/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 9624c01..b227c98 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -113,6 +113,7 @@ import 
org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.jboss.logging.Logger;
@@ -193,7 +194,7 @@ public abstract class AbstractJournalStorageManager extends 
CriticalComponentImp
 
    protected final Map<SimpleString, PersistedAddressSetting> 
mapPersistedAddressSettings = new ConcurrentHashMap<>();
 
-   protected final Set<Long> largeMessagesToDelete = new HashSet<>();
+   protected final ConcurrentLongHashMap<LargeServerMessage> 
largeMessagesToDelete = new ConcurrentLongHashMap<>();
 
    public AbstractJournalStorageManager(final Configuration config,
                                         final CriticalAnalyzer analyzer,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0a47e1bc/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 1d954c8..c1bcd49 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -19,7 +19,6 @@ package 
org.apache.activemq.artemis.core.persistence.impl.journal;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -309,17 +308,18 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
     */
    @Override
    protected void performCachedLargeMessageDeletes() {
-      for (Long largeMsgId : largeMessagesToDelete) {
-         SequentialFile msg = createFileForLargeMessage(largeMsgId, 
LargeMessageExtension.DURABLE);
+      largeMessagesToDelete.forEach((messageId, largeServerMessage) -> {
+         SequentialFile msg = createFileForLargeMessage(messageId, 
LargeMessageExtension.DURABLE);
          try {
             msg.delete();
          } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, 
largeMsgId);
+            ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, 
messageId);
          }
          if (replicator != null) {
-            replicator.largeMessageDelete(largeMsgId, 
JournalStorageManager.this);
+            replicator.largeMessageDelete(messageId, 
JournalStorageManager.this);
          }
-      }
+         confirmLargeMessage(largeServerMessage);
+      });
       largeMessagesToDelete.clear();
    }
 
@@ -460,10 +460,7 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
          readLock();
          try {
             if (isReplicated() && replicator.isSynchronizing()) {
-               synchronized (largeMessagesToDelete) {
-                  
largeMessagesToDelete.add(Long.valueOf(largeServerMessage.getMessageID()));
-                  confirmLargeMessage(largeServerMessage);
-               }
+               largeMessagesToDelete.put(largeServerMessage.getMessageID(), 
largeServerMessage);
                return;
             }
          } finally {
@@ -724,11 +721,9 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
       // only send durable messages... // listFiles append a "." to anything...
       List<String> filenames = largeMessagesFactory.listFiles("msg");
 
-      List<Long> idList = new ArrayList<>();
       for (String filename : filenames) {
-         Long id = getLargeMessageIdFromFilename(filename);
-         if (!largeMessagesToDelete.contains(id)) {
-            idList.add(id);
+         long id = getLargeMessageIdFromFilename(filename);
+         if (!largeMessagesToDelete.containsKey(id)) {
             SequentialFile seqFile = 
largeMessagesFactory.createSequentialFile(filename);
             long size = seqFile.size();
             largeMessages.put(id, new Pair<>(filename, size));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0a47e1bc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java
new file mode 100644
index 0000000..c3692ea
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveCrashOnBackupSyncTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
+import 
org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.junit.Wait;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class LiveCrashOnBackupSyncTest {
+
+   static int OK = 2;
+   public static String instancePth = System.getProperty("java.io.tmpdir");
+
+   @Before
+   public void setUp() throws Exception {
+      deleteFile(new File(instancePth + "live"));
+      deleteFile(new File(instancePth + "backup"));
+   }
+
+   @Test
+   public void liveCrashOnBackupSyncLargeMessageTest() throws Exception {
+      Process process = 
SpawnedVMSupport.spawnVM(LiveCrashOnBackupSyncTest.class.getCanonicalName());
+      Assert.assertEquals(OK, process.waitFor());
+
+      Configuration liveConfiguration = createLiveConfiguration();
+      ActiveMQServer liveServer = 
ActiveMQServers.newActiveMQServer(liveConfiguration);
+      liveServer.start();
+      Wait.waitFor(() -> liveServer.isStarted());
+
+      File liveLMDir = 
liveServer.getConfiguration().getLargeMessagesLocation();
+      Set<Long> liveLM = getAllMessageFileIds(liveLMDir);
+      Assert.assertEquals("we really ought to delete these after delivery", 0, 
liveLM.size());
+      liveServer.stop();
+   }
+
+   @After
+   public void tearDown() throws FileNotFoundException {
+      deleteFile(new File(instancePth + "live"));
+      deleteFile(new File(instancePth + "backup"));
+   }
+
+   private Configuration createLiveConfiguration() throws Exception {
+      Configuration conf = new ConfigurationImpl();
+      conf.setName("localhost::live");
+      File liveDir = newFolder("live");
+      conf.setBrokerInstance(liveDir);
+      conf.addAcceptorConfiguration("live", "tcp://localhost:61616");
+      conf.addConnectorConfiguration("backup", "tcp://localhost:61617");
+      conf.addConnectorConfiguration("live", "tcp://localhost:61616");
+      conf.setClusterUser("mycluster");
+      conf.setClusterPassword("mypassword");
+      ReplicatedPolicyConfiguration haPolicy = new 
ReplicatedPolicyConfiguration();
+      haPolicy.setVoteOnReplicationFailure(false);
+      haPolicy.setCheckForLiveServer(false);
+      conf.setHAPolicyConfiguration(haPolicy);
+      ClusterConnectionConfiguration ccconf = new 
ClusterConnectionConfiguration();
+      ccconf.setStaticConnectors(new 
ArrayList<>()).getStaticConnectors().add("backup");
+      ccconf.setName("cluster");
+      ccconf.setConnectorName("live");
+      conf.addClusterConfiguration(ccconf);
+
+      
conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024
 * 512).setConnectionTTLOverride(60_000L);
+      return conf;
+   }
+
+   private Configuration createBackupConfiguration() throws Exception {
+      Configuration conf = new ConfigurationImpl();
+      conf.setName("localhost::backup");
+      File backupDir = newFolder("backup");
+      conf.setBrokerInstance(backupDir);
+      ReplicaPolicyConfiguration haPolicy = new ReplicaPolicyConfiguration();
+      haPolicy.setClusterName("cluster");
+      conf.setHAPolicyConfiguration(haPolicy);
+      conf.addAcceptorConfiguration("backup", "tcp://localhost:61617");
+      conf.addConnectorConfiguration("live", "tcp://localhost:61616");
+      conf.addConnectorConfiguration("backup", "tcp://localhost:61617");
+      conf.setClusterUser("mycluster");
+      conf.setClusterPassword("mypassword");
+      ClusterConnectionConfiguration ccconf = new 
ClusterConnectionConfiguration();
+      ccconf.setStaticConnectors(new 
ArrayList<>()).getStaticConnectors().add("live");
+      ccconf.setName("cluster");
+      ccconf.setConnectorName("backup");
+      conf.addClusterConfiguration(ccconf);
+
+      
conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024
 * 512).setConnectionTTLOverride(60_000L);
+      return conf;
+   }
+
+   private File newFolder(String live) throws IOException {
+      File file = new File(instancePth + live);
+      if (!file.exists()) {
+         file.mkdirs();
+      }
+      return file;
+   }
+
+   private void deleteFile(File path) throws FileNotFoundException {
+      if (!path.exists())
+         return;
+      if (path.isDirectory()) {
+         for (File f : path.listFiles()) {
+            deleteFile(f);
+         }
+      }
+      path.delete();
+   }
+
+   private void createProducerSendSomeLargeMessages(int msgCount) throws 
Exception {
+      byte[] buffer = new byte[100 * 1024];
+      ServerLocator locator = 
ServerLocatorImpl.newLocator("tcp://localhost:61616");
+      locator.setCallTimeout(60_000L);
+      locator.setConnectionTTL(60_000L);
+      final ClientSessionFactory csf = locator.createSessionFactory();
+      ClientSession session = csf.createSession();
+      session.createQueue("LiveCrashTestQueue", RoutingType.ANYCAST, 
"LiveCrashTestQueue", true);
+      ClientProducer producer = session.createProducer("LiveCrashTestQueue");
+      ClientMessage msgs = session.createMessage(true);
+      msgs.getBodyBuffer().writeBytes(buffer);
+      for (int i = 0; i < msgCount; i++) {
+         producer.send(msgs);
+      }
+      producer.close();
+      session.close();
+   }
+
+   private void receiveMsgs(int msgCount) throws Exception {
+      ServerLocator locator = 
ServerLocatorImpl.newLocator("tcp://localhost:61616");
+      locator.setCallTimeout(60_000L);
+      locator.setConnectionTTL(60_000L);
+      final ClientSessionFactory csf = locator.createSessionFactory();
+      ClientSession session = csf.createSession();
+      session.start();
+      ClientConsumer consumer = session.createConsumer("LiveCrashTestQueue");
+      for (int i = 0; i < msgCount; i++) {
+         ClientMessage message = consumer.receive(1000);
+         Assert.assertNotNull("Expecting a message " + i, message);
+         message.acknowledge();
+      }
+      session.commit();
+      consumer.close();
+      session.close();
+   }
+
+   private Set<Long> getAllMessageFileIds(File dir) {
+      Set<Long> idsOnBkp = new TreeSet<>();
+      String[] fileList = dir.list();
+      if (fileList != null) {
+         for (String filename : fileList) {
+            if (filename.endsWith(".msg")) {
+               idsOnBkp.add(Long.valueOf(filename.split("\\.")[0]));
+            }
+         }
+      }
+      return idsOnBkp;
+   }
+
+   public static void main(String[] arg) {
+      try {
+         LiveCrashOnBackupSyncTest stop = new LiveCrashOnBackupSyncTest();
+         Configuration liveConfiguration = stop.createLiveConfiguration();
+         ActiveMQServer liveServer = new ActiveMQServerImpl(liveConfiguration, 
ManagementFactory.getPlatformMBeanServer(), new 
ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new 
SecurityConfiguration())) {
+            @Override
+            protected PagingStoreFactoryNIO getPagingStoreFactory() {
+               return new PagingStoreFactoryNIO(this.getStorageManager(), 
this.getConfiguration().getPagingLocation(), 
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), 
this.getExecutorFactory(), 
this.getConfiguration().isJournalSyncNonTransactional(), null) {
+                  @Override
+                  public synchronized PagingStore newStore(SimpleString 
address, AddressSettings settings) {
+                     return new DelayPagingStoreImpl(address, 
this.getScheduledExecutor(), liveConfiguration.getJournalBufferTimeout_NIO(), 
getPagingManager(), getStorageManager(), null, this, address, settings, 
getExecutorFactory().getExecutor(), this.isSyncNonTransactional());
+                  }
+               };
+            }
+         };
+         liveServer.start();
+         Wait.waitFor(() -> liveServer.isStarted());
+
+         Configuration backupConfiguration = stop.createBackupConfiguration();
+         ActiveMQServer backupServer = 
ActiveMQServers.newActiveMQServer(backupConfiguration);
+         backupServer.start();
+         Wait.waitFor(() -> backupServer.isStarted());
+
+         stop.createProducerSendSomeLargeMessages(100);
+         stop.receiveMsgs(100);
+
+         //flush
+         liveServer.getStorageManager().getMessageJournal().stop();
+
+         System.exit(OK);
+      } catch (Exception e) {
+         e.printStackTrace();
+      } catch (Throwable throwable) {
+         throwable.printStackTrace();
+      }
+   }
+}
+
+class DelayPagingStoreImpl extends PagingStoreImpl {
+
+   DelayPagingStoreImpl(SimpleString address,
+                               ScheduledExecutorService scheduledExecutor,
+                               long syncTimeout,
+                               PagingManager pagingManager,
+                               StorageManager storageManager,
+                               SequentialFileFactory fileFactory,
+                               PagingStoreFactory storeFactory,
+                               SimpleString storeName,
+                               AddressSettings addressSettings,
+                               ArtemisExecutor executor,
+                               boolean syncNonTransactional) {
+      super(address, scheduledExecutor, syncTimeout, pagingManager, 
storageManager, fileFactory, storeFactory, storeName, addressSettings, 
executor, syncNonTransactional);
+   }
+
+   @Override
+   public void sendPages(ReplicationManager replicator, Collection<Integer> 
pageIds) throws Exception {
+      //in order to extend the synchronization time
+      Thread.sleep(20 * 1000);
+      super.sendPages(replicator, pageIds);
+   }
+}
\ No newline at end of file

Reply via email to