This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new fd1ef367d3 ARTEMIS-4670 Slow processing with Large Messages and JDBC
fd1ef367d3 is described below
commit fd1ef367d358e480f7f685183c64b87cf86a6262
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Mar 5 18:58:04 2024 -0500
ARTEMIS-4670 Slow processing with Large Messages and JDBC
---
.../jdbc/store/file/JDBCSequentialFile.java | 6 +-
.../impl/journal/JDBCJournalStorageManager.java | 32 +++++
.../impl/journal/JournalStorageManager.java | 4 +-
.../core/persistence/impl/journal/LargeBody.java | 4 +-
.../RealServerDatabaseLargeMessageTest.java | 156 +++++++++++++++++++++
.../tests/integration/client/LargeMessageTest.java | 3 +-
6 files changed, 199 insertions(+), 6 deletions(-)
diff --git
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index e2becc4e82..a8f2b763c8 100644
---
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -331,10 +331,12 @@ public class JDBCSequentialFile implements SequentialFile
{
@Override
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback
callback) {
if (callback == null) {
- SimpleWaitIOCallback waitIOCallback = new SimpleWaitIOCallback();
+ final SimpleWaitIOCallback waitIOCallback = sync ? new
SimpleWaitIOCallback() : null;
try {
scheduleWrite(bytes, waitIOCallback);
- waitIOCallback.waitCompletion();
+ if (waitIOCallback != null) {
+ waitIOCallback.waitCompletion();
+ }
} catch (Exception e) {
waitIOCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(),
"Error writing to JDBC file.");
fileFactory.onIOError(e, "Failed to write to file.", this);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index a0a2035435..9d129adb17 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -22,15 +22,19 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.config.Configuration;
import
org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.slf4j.Logger;
@@ -98,6 +102,34 @@ public class JDBCJournalStorageManager extends
JournalStorageManager {
}
}
+
+ @Override
+ public final void addBytesToLargeMessage(final SequentialFile file,
+ final long messageId,
+ final byte[] bytes) throws
Exception {
+ // we override the method as we must get the context and increment it
+ // we also remove some of the replicator and retention stuff that does
not make sense in JDBC
+ try (ArtemisCloseable lock = closeableReadLock()) {
+ OperationContext context = getContext(true);
+ context.storeLineUp();
+ file.writeDirect(ByteBuffer.wrap(bytes), false, context);
+ }
+ }
+
+ @Override
+ public final void addBytesToLargeMessage(final SequentialFile file,
+ final long messageId,
+ final ActiveMQBuffer bytes) throws
Exception {
+ // we override the method as we cannot set the position of the file in
JDBC Files. otherwise multiple
+ // scheduleWrites would get the content wrong, unless we synchronized
(wait to completion) before the next writes.
+ // we also remove some of the replicator and retention stuff that does
not make sense in JDBC
+ try (ArtemisCloseable lock = closeableReadLock()) {
+ final byte[] copiedBytes = new byte[bytes.readableBytes()];
+ bytes.readBytes(copiedBytes);
+ addBytesToLargeMessage(file, messageId, copiedBytes);
+ }
+ }
+
@Override
public synchronized void stop(boolean ioCriticalError, boolean
sendFailover) throws Exception {
if (!started) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 9ba9861af3..85c1b4bd2d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -822,7 +822,7 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
}
@Override
- public final void addBytesToLargeMessage(final SequentialFile file,
+ public void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
final ActiveMQBuffer bytes) throws
Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
@@ -859,7 +859,7 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
}
@Override
- public final void addBytesToLargeMessage(final SequentialFile file,
+ public void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
final byte[] bytes) throws
Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
index 99e217513f..4cf1062ff0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
@@ -358,7 +358,8 @@ public class LargeBody {
byte[] bufferToWrite;
if (bytesRead <= 0) {
break;
- } else if (bytesRead == bufferBytes.length && this.storageManager
instanceof JournalStorageManager && !((JournalStorageManager)
this.storageManager).isReplicated()) {
+ } else if ((bytesRead == bufferBytes.length && this.storageManager
instanceof JournalStorageManager && !((JournalStorageManager)
this.storageManager).isReplicated() &&
+ !(this.storageManager instanceof
JDBCJournalStorageManager))) {
// ARTEMIS-1220: We cannot reuse the same buffer if it's
replicated
// otherwise there could be another thread still using the
buffer on a
// replication.
@@ -374,6 +375,7 @@ public class LargeBody {
break;
}
}
+ newMessage.releaseResources(true, false);
} finally {
cloneFile.close();
}
diff --git
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
new file mode 100644
index 0000000000..d65b0fed9d
--- /dev/null
+++
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.db.largeMessages;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.db.common.Database;
+import org.apache.activemq.artemis.tests.db.common.ParameterDBTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.activemq.artemis.utils.TestParameters.testProperty;
+
+public class RealServerDatabaseLargeMessageTest extends ParameterDBTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String TEST_NAME = "LMDB";
+
+ private static final int MESSAGE_SIZE =
Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "300000"));
+
+ private static final int PRODUCERS = 50;
+
+ private static final int MESSAGES_PER_PRODUCER = 2;
+
+ private ExecutorService executorService;
+
+ Process serverProcess;
+
+ @Parameterized.Parameters(name = "db={0}")
+ public static Collection<Object[]> parameters() {
+ return convertParameters(Database.selectedList());
+ }
+
+
+ @Before
+ public void before() throws Exception {
+ serverProcess = startServer(database.getName(), 0, 60_000);
+ executorService = Executors.newFixedThreadPool(PRODUCERS + 1); // there
will be one consumer
+ runAfter(executorService::shutdownNow);
+ }
+
+ @Test
+ public void testLargeMessage() throws Exception {
+ testLargeMessage("CORE");
+ testLargeMessage("AMQP");
+ testLargeMessage("OPENWIRE");
+ }
+
+ public void testLargeMessage(String protocol) throws Exception {
+ logger.info("testLargeMessage({})", protocol);
+ final String queueName = "QUEUE_" + RandomUtil.randomString() + "_" +
protocol + "_" + database;
+
+ ConnectionFactory connectionFactory =
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+
+ AtomicInteger errors = new AtomicInteger(0);
+ CyclicBarrier startflag = new CyclicBarrier(PRODUCERS);
+ CountDownLatch done = new CountDownLatch(PRODUCERS + 1);
+
+ byte[] messageLoad = RandomUtil.randomBytes(MESSAGE_SIZE);
+
+ for (int i = 0; i < PRODUCERS; i++) {
+ executorService.execute(() -> {
+ try {
+ try (Connection connection =
connectionFactory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ // align all producers right before start sending
+ startflag.await(5, TimeUnit.SECONDS);
+
+ for (int messageI = 0; messageI < MESSAGES_PER_PRODUCER;
messageI++) {
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(messageLoad);
+ producer.send(message);
+ session.commit();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ executorService.execute(() -> {
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(queue);
+ for (int messageI = 0; messageI < PRODUCERS *
MESSAGES_PER_PRODUCER; messageI++) {
+ BytesMessage message = (BytesMessage) consumer.receive(10_000);
+ Assert.assertNotNull(message);
+ logger.debug("Received message");
+ Assert.assertEquals(messageLoad.length,
message.getBodyLength());
+ byte[] messageRead = new byte[(int)message.getBodyLength()];
+ message.readBytes(messageRead);
+ Assert.assertArrayEquals(messageLoad, messageRead);
+ if (messageI % 5 == 0) {
+ session.commit();
+ }
+ }
+ session.commit();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ });
+
+ Assert.assertTrue(done.await(120, TimeUnit.SECONDS));
+ Assert.assertEquals(0, errors.get());
+
+ }
+
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index e1576cdb81..a096c3a840 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -2607,6 +2607,7 @@ public class LargeMessageTest extends
LargeMessageTestBase {
for (int i = 0; i < largeMessageSize; i++) {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
+ fileMessage.releaseResources(true, false);
Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());
@@ -2725,7 +2726,7 @@ public class LargeMessageTest extends
LargeMessageTestBase {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE,
largeMessageSize);
- fileMessage.releaseResources(false, false);
+ fileMessage.releaseResources(true, false);
prod.send(fileMessage);