This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new fd1ef367d3 ARTEMIS-4670 Slow processing with Large Messages and JDBC
fd1ef367d3 is described below

commit fd1ef367d358e480f7f685183c64b87cf86a6262
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Mar 5 18:58:04 2024 -0500

    ARTEMIS-4670 Slow processing with Large Messages and JDBC
---
 .../jdbc/store/file/JDBCSequentialFile.java        |   6 +-
 .../impl/journal/JDBCJournalStorageManager.java    |  32 +++++
 .../impl/journal/JournalStorageManager.java        |   4 +-
 .../core/persistence/impl/journal/LargeBody.java   |   4 +-
 .../RealServerDatabaseLargeMessageTest.java        | 156 +++++++++++++++++++++
 .../tests/integration/client/LargeMessageTest.java |   3 +-
 6 files changed, 199 insertions(+), 6 deletions(-)

diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index e2becc4e82..a8f2b763c8 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -331,10 +331,12 @@ public class JDBCSequentialFile implements SequentialFile 
{
    @Override
    public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback 
callback) {
       if (callback == null) {
-         SimpleWaitIOCallback waitIOCallback = new SimpleWaitIOCallback();
+         final SimpleWaitIOCallback waitIOCallback = sync ? new 
SimpleWaitIOCallback() : null;
          try {
             scheduleWrite(bytes, waitIOCallback);
-            waitIOCallback.waitCompletion();
+            if (waitIOCallback != null) {
+               waitIOCallback.waitCompletion();
+            }
          } catch (Exception e) {
             waitIOCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), 
"Error writing to JDBC file.");
             fileFactory.onIOError(e, "Failed to write to file.", this);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
index a0a2035435..9d129adb17 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java
@@ -22,15 +22,19 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.config.Configuration;
 import 
org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
 import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
 import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.utils.ArtemisCloseable;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 import org.slf4j.Logger;
@@ -98,6 +102,34 @@ public class JDBCJournalStorageManager extends 
JournalStorageManager {
       }
    }
 
+
+   @Override
+   public final void addBytesToLargeMessage(final SequentialFile file,
+                                            final long messageId,
+                                            final byte[] bytes) throws 
Exception {
+      // we override the method as we must get the context and increment it
+      // we also remove some of the replicator and retention stuff that does 
not make sense in JDBC
+      try (ArtemisCloseable lock = closeableReadLock()) {
+         OperationContext context = getContext(true);
+         context.storeLineUp();
+         file.writeDirect(ByteBuffer.wrap(bytes), false, context);
+      }
+   }
+
+   @Override
+   public final void addBytesToLargeMessage(final SequentialFile file,
+                                            final long messageId,
+                                            final ActiveMQBuffer bytes) throws 
Exception {
+      // we override the method as we cannot set the position of the file in 
JDBC Files. otherwise multiple
+      // scheduleWrites would get the content wrong, unless we synchronized 
(wait to completion) before the next writes.
+      // we also remove some of the replicator and retention stuff that does 
not make sense in JDBC
+      try (ArtemisCloseable lock = closeableReadLock()) {
+         final byte[] copiedBytes = new byte[bytes.readableBytes()];
+         bytes.readBytes(copiedBytes);
+         addBytesToLargeMessage(file, messageId, copiedBytes);
+      }
+   }
+
    @Override
    public synchronized void stop(boolean ioCriticalError, boolean 
sendFailover) throws Exception {
       if (!started) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 9ba9861af3..85c1b4bd2d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -822,7 +822,7 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
    }
 
    @Override
-   public final void addBytesToLargeMessage(final SequentialFile file,
+   public void addBytesToLargeMessage(final SequentialFile file,
                                             final long messageId,
                                             final ActiveMQBuffer bytes) throws 
Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
@@ -859,7 +859,7 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
    }
 
    @Override
-   public final void addBytesToLargeMessage(final SequentialFile file,
+   public void addBytesToLargeMessage(final SequentialFile file,
                                             final long messageId,
                                             final byte[] bytes) throws 
Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
index 99e217513f..4cf1062ff0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
@@ -358,7 +358,8 @@ public class LargeBody {
             byte[] bufferToWrite;
             if (bytesRead <= 0) {
                break;
-            } else if (bytesRead == bufferBytes.length && this.storageManager 
instanceof JournalStorageManager && !((JournalStorageManager) 
this.storageManager).isReplicated()) {
+            } else if ((bytesRead == bufferBytes.length && this.storageManager 
instanceof JournalStorageManager && !((JournalStorageManager) 
this.storageManager).isReplicated() &&
+                        !(this.storageManager instanceof 
JDBCJournalStorageManager))) {
                // ARTEMIS-1220: We cannot reuse the same buffer if it's 
replicated
                // otherwise there could be another thread still using the 
buffer on a
                // replication.
@@ -374,6 +375,7 @@ public class LargeBody {
                break;
             }
          }
+         newMessage.releaseResources(true, false);
       } finally {
          cloneFile.close();
       }
diff --git 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
new file mode 100644
index 0000000000..d65b0fed9d
--- /dev/null
+++ 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.db.largeMessages;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.db.common.Database;
+import org.apache.activemq.artemis.tests.db.common.ParameterDBTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.activemq.artemis.utils.TestParameters.testProperty;
+
+public class RealServerDatabaseLargeMessageTest extends ParameterDBTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String TEST_NAME = "LMDB";
+
+   private static final int MESSAGE_SIZE = 
Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "300000"));
+
+   private static final int PRODUCERS = 50;
+
+   private static final int MESSAGES_PER_PRODUCER = 2;
+
+   private ExecutorService executorService;
+
+   Process serverProcess;
+
+   @Parameterized.Parameters(name = "db={0}")
+   public static Collection<Object[]> parameters() {
+      return convertParameters(Database.selectedList());
+   }
+
+
+   @Before
+   public void before() throws Exception {
+      serverProcess = startServer(database.getName(), 0, 60_000);
+      executorService = Executors.newFixedThreadPool(PRODUCERS + 1); // there 
will be one consumer
+      runAfter(executorService::shutdownNow);
+   }
+
+   @Test
+   public void testLargeMessage() throws Exception {
+      testLargeMessage("CORE");
+      testLargeMessage("AMQP");
+      testLargeMessage("OPENWIRE");
+   }
+
+   public void testLargeMessage(String protocol) throws Exception {
+      logger.info("testLargeMessage({})", protocol);
+      final String queueName = "QUEUE_" + RandomUtil.randomString() + "_" + 
protocol + "_" + database;
+
+      ConnectionFactory connectionFactory = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+
+      AtomicInteger errors = new AtomicInteger(0);
+      CyclicBarrier startflag = new CyclicBarrier(PRODUCERS);
+      CountDownLatch done = new CountDownLatch(PRODUCERS + 1);
+
+      byte[] messageLoad = RandomUtil.randomBytes(MESSAGE_SIZE);
+
+      for (int i = 0; i < PRODUCERS; i++) {
+         executorService.execute(() -> {
+            try {
+               try (Connection connection = 
connectionFactory.createConnection()) {
+                  Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+                  Queue queue = session.createQueue(queueName);
+                  MessageProducer producer = session.createProducer(queue);
+
+                  // align all producers right before start sending
+                  startflag.await(5, TimeUnit.SECONDS);
+
+                  for (int messageI = 0; messageI < MESSAGES_PER_PRODUCER; 
messageI++) {
+                     BytesMessage message = session.createBytesMessage();
+                     message.writeBytes(messageLoad);
+                     producer.send(message);
+                     session.commit();
+                  }
+               }
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      executorService.execute(() -> {
+         try (Connection connection = connectionFactory.createConnection()) {
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(queueName);
+            MessageConsumer consumer = session.createConsumer(queue);
+            for (int messageI = 0; messageI < PRODUCERS * 
MESSAGES_PER_PRODUCER; messageI++) {
+               BytesMessage message = (BytesMessage) consumer.receive(10_000);
+               Assert.assertNotNull(message);
+               logger.debug("Received message");
+               Assert.assertEquals(messageLoad.length, 
message.getBodyLength());
+               byte[] messageRead = new byte[(int)message.getBodyLength()];
+               message.readBytes(messageRead);
+               Assert.assertArrayEquals(messageLoad, messageRead);
+               if (messageI % 5 == 0) {
+                  session.commit();
+               }
+            }
+            session.commit();
+         } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+            errors.incrementAndGet();
+         } finally {
+            done.countDown();
+         }
+      });
+
+      Assert.assertTrue(done.await(120, TimeUnit.SECONDS));
+      Assert.assertEquals(0, errors.get());
+
+   }
+
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index e1576cdb81..a096c3a840 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -2607,6 +2607,7 @@ public class LargeMessageTest extends 
LargeMessageTestBase {
       for (int i = 0; i < largeMessageSize; i++) {
          fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
       }
+      fileMessage.releaseResources(true, false);
 
       Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());
 
@@ -2725,7 +2726,7 @@ public class LargeMessageTest extends 
LargeMessageTestBase {
       // The server would be doing this
       fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 
largeMessageSize);
 
-      fileMessage.releaseResources(false, false);
+      fileMessage.releaseResources(true, false);
 
       prod.send(fileMessage);
 

Reply via email to