[
https://issues.apache.org/jira/browse/ARTEMIS-4193?focusedWorklogId=849807&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-849807
]
ASF GitHub Bot logged work on ARTEMIS-4193:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 08/Mar/23 12:57
Start Date: 08/Mar/23 12:57
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4395:
URL: https://github.com/apache/activemq-artemis/pull/4395#discussion_r1129337551
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -216,6 +219,10 @@ public void rebuild() throws Exception {
try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
while (iter.hasNext()) {
PagedMessage msg = iter.next();
+ if (storedLargeMessages != null &&
msg.getMessage().isLargeMessage()) {
+ logger.debug("removing storedLargeMessage {}",
msg.getMessage().getMessageID());
Review Comment:
As getMessageID returns a long it will be autoboxing, may be worth a gate
since all the similar logging before and after this uses them already.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java:
##########
@@ -1272,13 +1280,16 @@ final class MutableLong {
journalLoader.handleAddMessage(queueMap);
- loadPreparedTransactions(postOffice, pagingManager, resourceManager,
queueInfos, preparedTransactions, this::failedToPrepareException,
pageSubscriptions, pendingLargeMessages, journalLoader);
+ loadPreparedTransactions(postOffice, pagingManager, resourceManager,
queueInfos, preparedTransactions, this::failedToPrepareException,
pageSubscriptions, pendingLargeMessages, storedLargeMessages, journalLoader);
for (PageSubscription sub : pageSubscriptions.values()) {
sub.getCounter().processReload();
}
for (LargeServerMessage msg : largeMessages) {
+ if (storedLargeMessages != null &&
storedLargeMessages.remove(msg.getMessageID())) {
+ logger.debug("Large message in folder removed on {}",
msg.getMessageID());
Review Comment:
Ditto re: gate.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java:
##########
@@ -797,6 +764,14 @@ private Map<Long, Pair<String, Long>>
recoverPendingLargeMessages() throws Excep
return largeMessages;
}
+ @Override
+ public void recoverLargeMessagesOnFolder(Set<Long> files) throws Exception {
+ List<String> filenames = largeMessagesFactory.listFiles("msg");
+ filenames.forEach(f -> {
+ files.add(getLargeMessageIdFromFilename(f));
+ });
+ }
Review Comment:
Might be clearer to call the Set 'messageIDs' or something, given it doesnt
really contain files/names in the end.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java:
##########
@@ -3381,10 +3382,23 @@ synchronized void initialisePart2(boolean scalingDown)
throws Exception {
pagingManager.reloadStores();
- JournalLoadInformation[] journalInfo = loadJournals();
+ Set<Long> storedLargeMessages = new HashSet<>();
+ JournalLoadInformation[] journalInfo = loadJournals(storedLargeMessages);
if (rebuildCounters) {
- pagingManager.rebuildCounters();
+ pagingManager.rebuildCounters(storedLargeMessages);
+
+ pagingManager.execute(() -> {
+ storedLargeMessages.forEach(id -> {
+ try {
+ SequentialFile file =
storageManager.createFileForLargeMessage(id, true);
+ logger.debug("Removing pending large message for file={}",
file);
+ file.delete();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
Review Comment:
Might be good to give some context in the log message about what was being
done when the exception occurred. May not be so obvious to a user from the
exception message itself.
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.interruptlm;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// This is used to kill a server and make sure the server will remove any
pending files.
+public class LargeMessageInterruptTest extends SoakTestBase {
+
+ public static final String SERVER_NAME_0 = "interruptlm";
+ private static final String JMX_SERVER_HOSTNAME = "localhost";
+ private static final int JMX_SERVER_PORT_0 = 1099;
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ static String liveURI = "service:jmx:rmi:///jndi/rmi://" +
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
+ static ObjectNameBuilder liveNameBuilder =
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(),
"lminterrupt", true);
+ Process serverProcess;
+
+ public ConnectionFactory createConnectionFactory(String protocol) {
+ return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+ }
+
+ @Before
+ public void before() throws Exception {
+ cleanupData(SERVER_NAME_0);
+ serverProcess = startServer(SERVER_NAME_0, 0, 30000);
+ disableCheckThread();
+ }
+
+ @Test
+ public void testInterruptLargeMessageAMQPTX() throws Throwable {
+ testInterruptLM("AMQP", true, false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageAMQPTXPaging() throws Throwable {
+ testInterruptLM("AMQP", true, true);
+ }
+
+ @Test
+ public void testInterruptLargeMessageCORETX() throws Throwable {
+ testInterruptLM("CORE", true, false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageCORETXPaging() throws Throwable {
+ testInterruptLM("CORE", true, true);
+ }
+
+
+ @Test
+ public void testInterruptLargeMessageOPENWIRETX() throws Throwable {
+ testInterruptLM("OPENWIRE", true, false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageOPENWIRETXPaging() throws Throwable {
+ testInterruptLM("OPENWIRE", true, true);
+ }
+
+
+ @Test
+ public void testInterruptLargeMessageAMQPNonTX() throws Throwable {
+ testInterruptLM("AMQP", false, false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageAMQPNonTXPaging() throws Throwable {
+ testInterruptLM("AMQP", false, true);
+ }
+
+ @Test
+ public void testInterruptLargeMessageCORENonTX() throws Throwable {
+ testInterruptLM("CORE", false, false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageCORENonTXPaging() throws Throwable {
+ testInterruptLM("CORE", false, true);
+ }
+
+ private void testInterruptLM(String protocol, boolean tx, boolean paging)
throws Throwable {
+ final int BODY_SIZE = 500 * 1024;
+ final int NUMBER_OF_MESSAGES = 10; // this is per producer
+ final int SENDING_THREADS = 10;
+ CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS);
+ final CountDownLatch done = new CountDownLatch(SENDING_THREADS);
+ final AtomicInteger produced = new AtomicInteger(0);
+ final ConnectionFactory factory = createConnectionFactory(protocol);
+ final AtomicInteger errors = new AtomicInteger(0); // I don't expect
many errors since this test is disconnecting and reconnecting the server
+ final CountDownLatch killAt = new CountDownLatch(40);
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS);
+ runAfter(executorService::shutdownNow);
+
+ String queueName = "LargeMessageInterruptTest";
+
+ String largebody;
+
+ {
+ StringBuffer buffer = new StringBuffer();
+ while (buffer.length() < BODY_SIZE) {
+ buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T
REALLY CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I
DON'T REALLY CARE ");
+ }
+ largebody = buffer.toString();
+ }
+
+ if (paging) {
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueName));
+ for (int i = 0; i < 1000; i++) {
+ producer.send(session.createTextMessage("forcePage"));
+ }
+ session.commit();
+ }
+ }
+
+ for (int i = 0; i < SENDING_THREADS; i++) {
+ executorService.execute(() -> {
+ int numberOfMessages = 0;
+ try {
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueName));
+
+ startFlag.await(10, TimeUnit.SECONDS);
+ while (numberOfMessages < NUMBER_OF_MESSAGES) {
+ try {
+ producer.send(session.createTextMessage(largebody));
+ if (tx) {
+ session.commit();
+ }
+ produced.incrementAndGet();
+ killAt.countDown();
+ if (numberOfMessages++ % 10 == 0) {
+ logger.info("Sent {}", numberOfMessages);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+
+ logger.warn(e.getMessage(), e);
+ try {
+ connection.close();
+ } catch (Throwable ignored) {
+ }
+
+ for (int retryNumber = 0; retryNumber < 100;
retryNumber++) {
+ try {
+ Connection ctest = factory.createConnection();
+ ctest.close();
+ break;
+ } catch (Throwable retry) {
+ Thread.sleep(100);
+ }
+ }
+
+ connection = factory.createConnection();
+ session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ producer =
session.createProducer(session.createQueue(queueName));
+ connection.start();
+
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Error getting the initial connection", e);
+ errors.incrementAndGet();
+ }
+
+ logger.info("Done sending");
+ done.countDown();
+ });
+ }
+
+ Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
+ serverProcess.destroyForcibly();
+ serverProcess = startServer(SERVER_NAME_0, 0, 0);
+ QueueControl queueControl = getQueueControl(liveURI, liveNameBuilder,
queueName, queueName, RoutingType.ANYCAST, 5000);
+
+ Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ long numberOfMessages = queueControl.getMessageCount();
+ logger.info("there are {} messages", numberOfMessages);
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName));
+ connection.start();
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getText().equals("forcePage") ||
message.getText().equals(largebody));
+ Assert.assertNotNull(message);
Review Comment:
Redundant, already checked and used above
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java:
##########
@@ -1839,6 +1838,9 @@ private void loadSinglePreparedTransaction(PostOffice
postOffice,
}
case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
Message message = decodeMessage(pools, buff);
+ if (storedLargeMessages != null && message.isLargeMessage() &&
storedLargeMessages.remove(record.id)) {
+ logger.debug("PreparedTX load removing stored large message
{}", record.id);
Review Comment:
might be useful to differentiate the message from the currently-identical
log message above to help discern which occurred without resorting to
line-number output comparisons (Aside: I'm just assuming there is some
'known'/'obvious' reason for the 2 different record types doing such similar
thing to extent of having such simialr log messages).
also, guessing same about gate.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java:
##########
@@ -1829,6 +1825,9 @@ private void loadSinglePreparedTransaction(PostOffice
postOffice,
switch (recordType) {
case JournalRecordIds.ADD_LARGE_MESSAGE: {
+ if (storedLargeMessages != null &&
storedLargeMessages.remove(record.id)) {
+ logger.debug("PreparedTX load removing stored large message
{}", record.id);
Review Comment:
guessing the same here
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.interruptlm;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// This is used to kill a server and make sure the server will remove any
pending files.
+public class LargeMessageInterruptTest extends SoakTestBase {
+
+ public static final String SERVER_NAME_0 = "interruptlm";
+ private static final String JMX_SERVER_HOSTNAME = "localhost";
+ private static final int JMX_SERVER_PORT_0 = 1099;
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ static String liveURI = "service:jmx:rmi:///jndi/rmi://" +
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
+ static ObjectNameBuilder liveNameBuilder =
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(),
"lminterrupt", true);
+ Process serverProcess;
+
+ public ConnectionFactory createConnectionFactory(String protocol) {
+ return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+ }
+
+ @Before
+ public void before() throws Exception {
+ cleanupData(SERVER_NAME_0);
+ serverProcess = startServer(SERVER_NAME_0, 0, 30000);
+ disableCheckThread();
+ }
+
+ @Test
+ public void testInterruptLargeMessageAMQPTX() throws Throwable {
+ testInterruptLM("AMQP", true, false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageAMQPTXPaging() throws Throwable {
+ testInterruptLM("AMQP", true, true);
+ }
+
+ @Test
+ public void testInterruptLargeMessageCORETX() throws Throwable {
+ testInterruptLM("CORE", true, false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageCORETXPaging() throws Throwable {
+ testInterruptLM("CORE", true, true);
+ }
+
+
+ @Test
+ public void testInterruptLargeMessageOPENWIRETX() throws Throwable {
+ testInterruptLM("OPENWIRE", true, false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageOPENWIRETXPaging() throws Throwable {
+ testInterruptLM("OPENWIRE", true, true);
+ }
+
+
+ @Test
+ public void testInterruptLargeMessageAMQPNonTX() throws Throwable {
+ testInterruptLM("AMQP", false, false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageAMQPNonTXPaging() throws Throwable {
+ testInterruptLM("AMQP", false, true);
+ }
+
+ @Test
+ public void testInterruptLargeMessageCORENonTX() throws Throwable {
+ testInterruptLM("CORE", false, false);
+ }
+
+ @Test
+ public void testInterruptLargeMessageCORENonTXPaging() throws Throwable {
+ testInterruptLM("CORE", false, true);
+ }
+
+ private void testInterruptLM(String protocol, boolean tx, boolean paging)
throws Throwable {
+ final int BODY_SIZE = 500 * 1024;
+ final int NUMBER_OF_MESSAGES = 10; // this is per producer
+ final int SENDING_THREADS = 10;
+ CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS);
+ final CountDownLatch done = new CountDownLatch(SENDING_THREADS);
+ final AtomicInteger produced = new AtomicInteger(0);
+ final ConnectionFactory factory = createConnectionFactory(protocol);
+ final AtomicInteger errors = new AtomicInteger(0); // I don't expect
many errors since this test is disconnecting and reconnecting the server
+ final CountDownLatch killAt = new CountDownLatch(40);
+
+ ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS);
+ runAfter(executorService::shutdownNow);
+
+ String queueName = "LargeMessageInterruptTest";
+
+ String largebody;
+
+ {
+ StringBuffer buffer = new StringBuffer();
+ while (buffer.length() < BODY_SIZE) {
+ buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T
REALLY CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I
DON'T REALLY CARE ");
+ }
+ largebody = buffer.toString();
+ }
+
+ if (paging) {
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueName));
+ for (int i = 0; i < 1000; i++) {
+ producer.send(session.createTextMessage("forcePage"));
+ }
+ session.commit();
+ }
+ }
+
+ for (int i = 0; i < SENDING_THREADS; i++) {
+ executorService.execute(() -> {
+ int numberOfMessages = 0;
+ try {
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueName));
+
+ startFlag.await(10, TimeUnit.SECONDS);
+ while (numberOfMessages < NUMBER_OF_MESSAGES) {
+ try {
+ producer.send(session.createTextMessage(largebody));
+ if (tx) {
+ session.commit();
+ }
+ produced.incrementAndGet();
+ killAt.countDown();
+ if (numberOfMessages++ % 10 == 0) {
+ logger.info("Sent {}", numberOfMessages);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+
+ logger.warn(e.getMessage(), e);
+ try {
+ connection.close();
+ } catch (Throwable ignored) {
+ }
+
+ for (int retryNumber = 0; retryNumber < 100;
retryNumber++) {
+ try {
+ Connection ctest = factory.createConnection();
+ ctest.close();
+ break;
+ } catch (Throwable retry) {
+ Thread.sleep(100);
+ }
+ }
+
+ connection = factory.createConnection();
+ session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ producer =
session.createProducer(session.createQueue(queueName));
+ connection.start();
+
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Error getting the initial connection", e);
+ errors.incrementAndGet();
+ }
+
+ logger.info("Done sending");
+ done.countDown();
+ });
+ }
+
+ Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
+ serverProcess.destroyForcibly();
+ serverProcess = startServer(SERVER_NAME_0, 0, 0);
+ QueueControl queueControl = getQueueControl(liveURI, liveNameBuilder,
queueName, queueName, RoutingType.ANYCAST, 5000);
+
+ Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ long numberOfMessages = queueControl.getMessageCount();
Review Comment:
Its not clear to me why this seems to go to the effort of starting the
broker without waiting, spinning in a loop trying to connect to management to
get a QueueControl (and potentially waiting 500ms at a time, and spewing
tracktrace, while doing it), only to not use the QueueControl until after it is
done waiting for sending to complete?
Issue Time Tracking
-------------------
Worklog Id: (was: 849807)
Time Spent: 1h (was: 50m)
> Interrupting Large Message Streaming with a server kill may leave orphaned
> files
> --------------------------------------------------------------------------------
>
> Key: ARTEMIS-4193
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4193
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.28.0
> Reporter: Clebert Suconic
> Priority: Major
> Fix For: 2.29.0
>
> Time Spent: 1h
> Remaining Estimate: 0h
>
> There's a schema in the journal to store pending records before a file is
> created.
> However the sync is not properly applied and if the server is killed it could
> leave a few messages orphaned in the file system.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)