This is an automated email from the ASF dual-hosted git repository. michaelpearce pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new 3954f01 ARTEMIS-2186 Large message incomplete when server is crashed new 3aebe74 This closes #2444 3954f01 is described below commit 3954f0183f63291722f504e7d41beb052a6639aa Author: yang wei <wy96...@gmail.com> AuthorDate: Wed Nov 28 20:37:09 2018 +0800 ARTEMIS-2186 Large message incomplete when server is crashed --- .../impl/journal/LargeServerMessageImpl.java | 1 + .../largemessage/ServerLargeMessageTest.java | 160 ++++++++++++++++++++- 2 files changed, 160 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 431c19d..42a76be 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -327,6 +327,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe public synchronized void releaseResources() { if (file != null && file.isOpen()) { try { + file.sync(); file.close(); } catch (Exception e) { ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java index e325273..6f45d16 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java @@ -16,11 +16,17 @@ */ package org.apache.activemq.artemis.tests.integration.largemessage; +import java.io.File; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.URL; +import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -30,6 +36,10 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; +import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.security.Role; @@ -38,6 +48,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.tests.integration.security.SecurityTest; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -177,7 +188,154 @@ public class ServerLargeMessageTest extends ActiveMQTestBase { } } - // Package protected --------------------------------------------- + @Test + public void testLargeServerMessageSync() throws Exception { + final AtomicBoolean open = new AtomicBoolean(false); + final AtomicBoolean sync = new AtomicBoolean(false); + + JournalStorageManager storageManager = new JournalStorageManager(createDefaultInVMConfig(), EmptyCriticalAnalyzer.getInstance(), getOrderedExecutor(), getOrderedExecutor()) { + @Override + public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) { + return new SequentialFile() { + @Override + public boolean isOpen() { + return open.get(); + } + + @Override + public boolean exists() { + return true; + } + + @Override + public void open() throws Exception { + open.set(true); + } + + @Override + public void open(int maxIO, boolean useExecutor) throws Exception { + open.set(true); + } + + @Override + public boolean fits(int size) { + return false; + } + + @Override + public int calculateBlockStart(int position) throws Exception { + return 0; + } + + @Override + public String getFileName() { + return null; + } + + @Override + public void fill(int size) throws Exception { + } + + @Override + public void delete() throws IOException, InterruptedException, ActiveMQException { + } + + @Override + public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception { + } + + @Override + public void write(ActiveMQBuffer bytes, boolean sync) throws Exception { + } + + @Override + public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception { + } + + @Override + public void write(EncodingSupport bytes, boolean sync) throws Exception { + } + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) { + } + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception { + } + + @Override + public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception { + } + + @Override + public int read(ByteBuffer bytes, IOCallback callback) throws Exception { + return 0; + } + + @Override + public int read(ByteBuffer bytes) throws Exception { + return 0; + } + + @Override + public void position(long pos) throws IOException { + } + + @Override + public long position() { + return 0; + } + + @Override + public void close() throws Exception { + open.set(false); + } + + @Override + public void sync() throws IOException { + sync.set(true); + } + + @Override + public long size() throws Exception { + return 0; + } + + @Override + public void renameTo(String newFileName) throws Exception { + } + + @Override + public SequentialFile cloneFile() { + return null; + } + + @Override + public void copyTo(SequentialFile newFileName) throws Exception { + } + + @Override + public void setTimedBuffer(TimedBuffer buffer) { + } + + @Override + public File getJavaFile() { + return null; + } + }; + } + }; + + LargeServerMessageImpl largeServerMessage = new LargeServerMessageImpl(storageManager); + largeServerMessage.setMessageID(1234); + largeServerMessage.addBytes(new byte[0]); + assertTrue(open.get()); + largeServerMessage.releaseResources(); + assertTrue(sync.get()); + } + + // Package protected --------------------------------------------- // Protected -----------------------------------------------------