This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3954f01  ARTEMIS-2186 Large message incomplete when server is crashed
     new 3aebe74  This closes #2444
3954f01 is described below

commit 3954f0183f63291722f504e7d41beb052a6639aa
Author: yang wei <wy96...@gmail.com>
AuthorDate: Wed Nov 28 20:37:09 2018 +0800

    ARTEMIS-2186 Large message incomplete when server is crashed
---
 .../impl/journal/LargeServerMessageImpl.java       |   1 +
 .../largemessage/ServerLargeMessageTest.java       | 160 ++++++++++++++++++++-
 2 files changed, 160 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 431c19d..42a76be 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -327,6 +327,7 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements LargeSe
    public synchronized void releaseResources() {
       if (file != null && file.isOpen()) {
          try {
+            file.sync();
             file.close();
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
index e325273..6f45d16 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
@@ -16,11 +16,17 @@
  */
 package org.apache.activemq.artemis.tests.integration.largemessage;
 
+import java.io.File;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -30,6 +36,10 @@ import 
org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.security.Role;
@@ -38,6 +48,7 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServers;
 import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -177,7 +188,154 @@ public class ServerLargeMessageTest extends 
ActiveMQTestBase {
       }
    }
 
-   // Package protected ---------------------------------------------
+   @Test
+   public void testLargeServerMessageSync() throws Exception {
+      final AtomicBoolean open = new AtomicBoolean(false);
+      final AtomicBoolean sync = new AtomicBoolean(false);
+
+      JournalStorageManager storageManager = new 
JournalStorageManager(createDefaultInVMConfig(), 
EmptyCriticalAnalyzer.getInstance(), getOrderedExecutor(), 
getOrderedExecutor()) {
+         @Override
+         public SequentialFile createFileForLargeMessage(long messageID, 
LargeMessageExtension extension) {
+            return new SequentialFile() {
+               @Override
+               public boolean isOpen() {
+                  return open.get();
+               }
+
+               @Override
+               public boolean exists() {
+                  return true;
+               }
+
+               @Override
+               public void open() throws Exception {
+                  open.set(true);
+               }
+
+               @Override
+               public void open(int maxIO, boolean useExecutor) throws 
Exception {
+                  open.set(true);
+               }
+
+               @Override
+               public boolean fits(int size) {
+                  return false;
+               }
+
+               @Override
+               public int calculateBlockStart(int position) throws Exception {
+                  return 0;
+               }
+
+               @Override
+               public String getFileName() {
+                  return null;
+               }
+
+               @Override
+               public void fill(int size) throws Exception {
+               }
+
+               @Override
+               public void delete() throws IOException, InterruptedException, 
ActiveMQException {
+               }
+
+               @Override
+               public void write(ActiveMQBuffer bytes, boolean sync, 
IOCallback callback) throws Exception {
+               }
+
+               @Override
+               public void write(ActiveMQBuffer bytes, boolean sync) throws 
Exception {
+               }
+
+               @Override
+               public void write(EncodingSupport bytes, boolean sync, 
IOCallback callback) throws Exception {
+               }
+
+               @Override
+               public void write(EncodingSupport bytes, boolean sync) throws 
Exception {
+               }
+
+               @Override
+               public void writeDirect(ByteBuffer bytes, boolean sync, 
IOCallback callback) {
+               }
+
+               @Override
+               public void writeDirect(ByteBuffer bytes, boolean sync) throws 
Exception {
+               }
+
+               @Override
+               public void blockingWriteDirect(ByteBuffer bytes, boolean sync, 
boolean releaseBuffer) throws Exception {
+               }
+
+               @Override
+               public int read(ByteBuffer bytes, IOCallback callback) throws 
Exception {
+                  return 0;
+               }
+
+               @Override
+               public int read(ByteBuffer bytes) throws Exception {
+                  return 0;
+               }
+
+               @Override
+               public void position(long pos) throws IOException {
+               }
+
+               @Override
+               public long position() {
+                  return 0;
+               }
+
+               @Override
+               public void close() throws Exception {
+                  open.set(false);
+               }
+
+               @Override
+               public void sync() throws IOException {
+                  sync.set(true);
+               }
+
+               @Override
+               public long size() throws Exception {
+                  return 0;
+               }
+
+               @Override
+               public void renameTo(String newFileName) throws Exception {
+               }
+
+               @Override
+               public SequentialFile cloneFile() {
+                  return null;
+               }
+
+               @Override
+               public void copyTo(SequentialFile newFileName) throws Exception 
{
+               }
+
+               @Override
+               public void setTimedBuffer(TimedBuffer buffer) {
+               }
+
+               @Override
+               public File getJavaFile() {
+                  return null;
+               }
+            };
+         }
+      };
+
+      LargeServerMessageImpl largeServerMessage = new 
LargeServerMessageImpl(storageManager);
+      largeServerMessage.setMessageID(1234);
+      largeServerMessage.addBytes(new byte[0]);
+      assertTrue(open.get());
+      largeServerMessage.releaseResources();
+      assertTrue(sync.get());
+   }
+
+      // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 

Reply via email to