This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c5b81d929d ARTEMIS-4682 Improving performance of reloading JDBC storage
c5b81d929d is described below

commit c5b81d929dcfa4e7bdee98e1ca8f97b48ecbb396
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Mar 11 17:09:20 2024 -0400

    ARTEMIS-4682 Improving performance of reloading JDBC storage
    
    - fixing leaks
    - page.exists() would lead to a select(all) and find.
---
 .../jdbc/store/file/JDBCSequentialFile.java        | 28 ++++++----------
 .../jdbc/store/file/JDBCSequentialFileFactory.java | 29 +++++-----------
 .../file/JDBCSequentialFileFactoryDriver.java      |  4 +--
 .../db/paging/RealServerDatabasePagingTest.java    | 39 ++++++++++++++++------
 4 files changed, 50 insertions(+), 50 deletions(-)

diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index a8f2b763c8..5289313ba3 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -67,8 +67,6 @@ public class JDBCSequentialFile implements SequentialFile {
 
    private final JDBCSequentialFileFactory fileFactory;
 
-   private final Object writeLock;
-
    private final JDBCSequentialFileFactoryDriver dbDriver;
 
    MpscUnboundedArrayQueue<ScheduledWrite> writeQueue = new 
MpscUnboundedArrayQueue<>(8192);
@@ -89,13 +87,11 @@ public class JDBCSequentialFile implements SequentialFile {
                       final Executor executor,
                       final ScheduledExecutorService scheduledExecutorService,
                       final long syncDelay,
-                      final JDBCSequentialFileFactoryDriver driver,
-                      final Object writeLock) throws SQLException {
+                      final JDBCSequentialFileFactoryDriver driver) throws 
SQLException {
       this.fileFactory = fileFactory;
       this.filename = filename;
       this.extension = filename.contains(".") ? 
filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : "";
       this.executor = executor;
-      this.writeLock = writeLock;
       this.dbDriver = driver;
       this.scheduledExecutorService = scheduledExecutorService;
       this.syncDelay = syncDelay;
@@ -113,14 +109,10 @@ public class JDBCSequentialFile implements SequentialFile 
{
 
    @Override
    public boolean exists() {
-      if (isLoaded.get()) return true;
       try {
-         return fileFactory.listFiles(extension).contains(filename);
-      } catch (Exception e) {
-         logger.debug(e.getMessage(), e);
-         // this shouldn't throw a critical IO Error
-         // as if the destination does not exists (ot table store removed), 
the table will not exist and
-         // we may get a SQL Exception
+         return dbDriver.getFileID(this) >= 0;
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
          return false;
       }
    }
@@ -178,7 +170,7 @@ public class JDBCSequentialFile implements SequentialFile {
    @Override
    public void delete() throws IOException, InterruptedException, 
ActiveMQException {
       try {
-         synchronized (writeLock) {
+         synchronized (this) {
             if (load()) {
                dbDriver.deleteFile(this);
             }
@@ -193,7 +185,7 @@ public class JDBCSequentialFile implements SequentialFile {
    private synchronized int jdbcWrite(byte[] data, IOCallback callback, 
boolean append) {
       try {
          logger.debug("Writing {} bytes into {}", data.length, filename);
-         synchronized (writeLock) {
+         synchronized (this) {
             int noBytes = dbDriver.writeToFile(this, data, append);
             seek(append ? writePosition + noBytes : noBytes);
             if (logger.isTraceEnabled()) {
@@ -360,7 +352,7 @@ public class JDBCSequentialFile implements SequentialFile {
 
    @Override
    public synchronized int read(ByteBuffer bytes, final IOCallback callback) 
throws SQLException {
-      synchronized (writeLock) {
+      synchronized (this) {
          try {
             int read = dbDriver.readFromFile(this, bytes);
             readPosition += read;
@@ -439,7 +431,7 @@ public class JDBCSequentialFile implements SequentialFile {
 
    @Override
    public void renameTo(String newFileName) throws Exception {
-      synchronized (writeLock) {
+      synchronized (this) {
          try {
             dbDriver.renameFile(this, newFileName);
          } catch (SQLException e) {
@@ -451,7 +443,7 @@ public class JDBCSequentialFile implements SequentialFile {
    @Override
    public SequentialFile cloneFile() {
       try {
-         JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, 
filename, executor, scheduledExecutorService, syncDelay, dbDriver, writeLock);
+         JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, 
filename, executor, scheduledExecutorService, syncDelay, dbDriver);
          clone.setWritePosition(this.writePosition);
          return clone;
       } catch (Exception e) {
@@ -464,7 +456,7 @@ public class JDBCSequentialFile implements SequentialFile {
    public void copyTo(SequentialFile cloneFile) throws Exception {
       JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
       try {
-         synchronized (writeLock) {
+         synchronized (this) {
             logger.trace("JDBC Copying File.  From: {} To: {}", this, 
cloneFile);
             clone.open();
             dbDriver.copyFileData(this, clone);
diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 77791f59f1..899924adc4 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -20,9 +20,6 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@@ -32,11 +29,11 @@ import 
org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 public class JDBCSequentialFileFactory implements SequentialFileFactory, 
ActiveMQComponent {
 
@@ -44,14 +41,14 @@ public class JDBCSequentialFileFactory implements 
SequentialFileFactory, ActiveM
 
    private boolean started;
 
-   private final Set<JDBCSequentialFile> files = new ConcurrentHashSet<>();
-
    private final Executor executor;
 
-   private final Map<String, Object> fileLocks = new ConcurrentHashMap<>();
-
    private JDBCSequentialFileFactoryDriver dbDriver;
 
+   private volatile int countOpen = 0;
+
+   private static final AtomicIntegerFieldUpdater<JDBCSequentialFileFactory> 
countOpenUpdater = 
AtomicIntegerFieldUpdater.newUpdater(JDBCSequentialFileFactory.class, 
"countOpen");
+
    private final IOCriticalErrorListener criticalErrorListener;
 
    private final long syncDelay;
@@ -133,9 +130,8 @@ public class JDBCSequentialFileFactory implements 
SequentialFileFactory, ActiveM
    @Override
    public SequentialFile createSequentialFile(String fileName) {
       try {
-         fileLocks.putIfAbsent(fileName, new Object());
-         JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, 
executor, scheduledExecutorService, syncDelay, dbDriver, 
fileLocks.get(fileName));
-         files.add(file);
+         JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, 
executor, scheduledExecutorService, syncDelay, dbDriver);
+         countOpenUpdater.incrementAndGet(this);
          return file;
       } catch (Exception e) {
          criticalErrorListener.onIOException(e, "Error whilst creating JDBC 
file", null);
@@ -144,11 +140,11 @@ public class JDBCSequentialFileFactory implements 
SequentialFileFactory, ActiveM
    }
 
    public void sequentialFileClosed(SequentialFile file) {
-      files.remove(file);
+      countOpenUpdater.decrementAndGet(this);
    }
 
    public int getNumberOfOpenFiles() {
-      return files.size();
+      return countOpenUpdater.get(this);
    }
 
    @Override
@@ -261,13 +257,6 @@ public class JDBCSequentialFileFactory implements 
SequentialFileFactory, ActiveM
 
    @Override
    public void flush() {
-      for (SequentialFile file : files) {
-         try {
-            file.sync();
-         } catch (Exception e) {
-            criticalErrorListener.onIOException(e, "Error during JDBC file 
sync.", file.getFileName());
-         }
-      }
    }
 
    public synchronized void destroy() throws SQLException {
diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
index 05726feabc..c0e32a11e3 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
@@ -106,7 +106,7 @@ public class JDBCSequentialFileFactoryDriver extends 
AbstractJDBCDriver {
     * @throws SQLException
     */
    public void openFile(JDBCSequentialFile file) throws SQLException {
-      final long fileId = fileExists(file);
+      final long fileId = getFileID(file);
       if (fileId < 0) {
          createFile(file);
       } else {
@@ -126,7 +126,7 @@ public class JDBCSequentialFileFactoryDriver extends 
AbstractJDBCDriver {
     * @return
     * @throws SQLException
     */
-   public long fileExists(JDBCSequentialFile file) throws SQLException {
+   public long getFileID(JDBCSequentialFile file) throws SQLException {
       try (Connection connection = connectionProvider.getConnection()) {
          try (PreparedStatement selectFileByFileName = 
connection.prepareStatement(this.selectFileByFileName)) {
             connection.setAutoCommit(false);
diff --git 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
index 65a42ad01a..fefcbed1ea 100644
--- 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
+++ 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
@@ -49,9 +49,12 @@ public class RealServerDatabasePagingTest extends 
ParameterDBTestBase {
 
    private static final int MAX_MESSAGES = 
Integer.parseInt(testProperty(TEST_NAME, "MAX_MESSAGES", "200"));
 
+   private static final int SOAK_MAX_MESSAGES = 
Integer.parseInt(testProperty(TEST_NAME, "SOAK_MAX_MESSAGES", "100000"));
+
    private static final int MESSAGE_SIZE = 
Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "1000"));
+   private static final int SOAK_MESSAGE_SIZE = 
Integer.parseInt(testProperty(TEST_NAME, "SOAK_MESSAGE_SIZE", "1000"));
 
-   private static final int COMMIT_INTERVAL = 
Integer.parseInt(testProperty(TEST_NAME, "COMMIT_INTERVAL", "100"));
+   private static final int COMMIT_INTERVAL = 
Integer.parseInt(testProperty(TEST_NAME, "COMMIT_INTERVAL", "1000"));
 
    Process serverProcess;
 
@@ -69,29 +72,37 @@ public class RealServerDatabasePagingTest extends 
ParameterDBTestBase {
 
    @Test
    public void testPaging() throws Exception {
-      testPaging("CORE");
-      testPaging("AMQP");
-      testPaging("OPENWIRE");
+      testPaging("CORE", MAX_MESSAGES, MESSAGE_SIZE);
+      testPaging("AMQP", MAX_MESSAGES, MESSAGE_SIZE);
+      testPaging("OPENWIRE", MAX_MESSAGES, MESSAGE_SIZE);
+   }
+
+
+   @Test
+   public void testSoakPaging() throws Exception {
+      testPaging("AMQP", SOAK_MAX_MESSAGES, SOAK_MESSAGE_SIZE);
    }
 
-   public void testPaging(String protocol) throws Exception {
+   private void testPaging(String protocol, int messages, int messageSize) 
throws Exception {
       logger.info("performing paging test on protocol={} and db={}", protocol, 
database);
 
       final String queueName = "QUEUE_" + RandomUtil.randomString() + "_" + 
protocol + "_" + database;
 
       ConnectionFactory connectionFactory = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
 
+      byte[] messageLoad = RandomUtil.randomBytes(messageSize);
+
       try (Connection connection = connectionFactory.createConnection()) {
-         byte[] messageLoad = new byte[MESSAGE_SIZE];
          Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
          Queue queue = session.createQueue(queueName);
          MessageProducer producer = session.createProducer(queue);
-         for (int i = 0; i < MAX_MESSAGES; i++) {
+         for (int i = 0; i < messages; i++) {
             BytesMessage message = session.createBytesMessage();
             message.writeBytes(messageLoad);
             message.setIntProperty("i", i);
             producer.send(message);
             if (i % COMMIT_INTERVAL == 0) {
+               logger.info("Sent {} messages", i);
                session.commit();
             }
          }
@@ -107,15 +118,23 @@ public class RealServerDatabasePagingTest extends 
ParameterDBTestBase {
 
       try (Connection connection = connectionFactory.createConnection()) {
          connection.start();
-         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
          Queue queue = session.createQueue(queueName);
          MessageConsumer consumer = session.createConsumer(queue);
-         for (int i = 0; i < MAX_MESSAGES; i++) {
+         for (int i = 0; i < messages; i++) {
             BytesMessage message = (BytesMessage) consumer.receive(5000);
             Assert.assertNotNull(message);
             Assert.assertEquals(i, message.getIntProperty("i"));
-            Assert.assertEquals(MESSAGE_SIZE, message.getBodyLength());
+            Assert.assertEquals(messageSize, message.getBodyLength());
+            byte[] bytesOutput = new byte[(int)message.getBodyLength()];
+            message.readBytes(bytesOutput);
+            Assert.assertArrayEquals(messageLoad, bytesOutput);
+            if (i % COMMIT_INTERVAL == 0) {
+               logger.info("Received {}", i);
+               session.commit();
+            }
          }
+         session.commit();
          Assert.assertNull(consumer.receiveNoWait());
       }
 

Reply via email to