This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new d5a76a27cc ARTEMIS-5829 Cleanup Postgres pg_largeobjects
d5a76a27cc is described below

commit d5a76a27ccd4d04950bfba9460f0fc4d813964e2
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Dec 18 10:03:10 2025 -0500

    ARTEMIS-5829 Cleanup Postgres pg_largeobjects
    
    When paging or large message is used,
    pg_largeobjects should be cleared.
    
    this is a feature from Postgres that the FileDriver is using on
    postgresql
---
 .../jdbc/store/file/JDBCSequentialFile.java        | 17 +++++++++
 .../store/file/PostgresLargeObjectManager.java     | 18 ++++++++++
 .../PostgresSequentialSequentialFileDriver.java    | 42 +++++++++++++++++++---
 .../tests/db/common/ParameterDBTestBase.java       | 32 +++++++++++++++++
 .../RealServerDatabaseLargeMessageTest.java        |  7 ++++
 .../artemis/tests/db/paging/PagingTest.java        |  6 ++--
 .../db/paging/RealServerDatabasePagingTest.java    | 18 +++++++++-
 7 files changed, 131 insertions(+), 9 deletions(-)

diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 6af1308ff4..f06c5f2d21 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -58,8 +58,13 @@ public class JDBCSequentialFile implements SequentialFile {
 
    private AtomicBoolean isLoaded = new AtomicBoolean(false);
 
+   private volatile boolean deleted = false;
+
    private long id = -1;
 
+   // used in postgres
+   private long oid = -1;
+
    private long readPosition = 0;
 
    private long writePosition = 0;
@@ -172,9 +177,13 @@ public class JDBCSequentialFile implements SequentialFile {
    public void delete() throws IOException, InterruptedException, 
ActiveMQException {
       try {
          synchronized (this) {
+            if (deleted) {
+               return;
+            }
             if (load()) {
                dbDriver.deleteFile(this);
             }
+            deleted = true;
          }
       } catch (SQLException e) {
          // file is already gone from a drop somewhere
@@ -473,6 +482,14 @@ public class JDBCSequentialFile implements SequentialFile {
       return id;
    }
 
+   public long getOid() {
+      return oid;
+   }
+
+   public void setOid(long oid) {
+      this.oid = oid;
+   }
+
    public void setId(long id) {
       this.id = id;
    }
diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
index fa5e67793d..c81da2b28c 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
 
 import org.postgresql.PGConnection;
 import org.postgresql.largeobject.LargeObject;
+import org.postgresql.largeobject.LargeObjectManager;
 
 /**
  * Helper class for when the postresql driver is not directly availalbe.
@@ -70,6 +71,23 @@ public class PostgresLargeObjectManager {
       }
    }
 
+   public final void deleteLO(Connection connection, long oid) throws 
SQLException {
+      Object largeObjectManager = getLargeObjectManager(connection);
+      if (shouldUseReflection) {
+         try {
+            Method method = largeObjectManager.getClass().getMethod("delete", 
long.class);
+            method.invoke(largeObjectManager, oid);
+         } catch (Exception ex) {
+            throw new SQLException("Couldn't access 
org.postgresql.largeobject.LargeObjectManager", ex);
+         }
+      } else {
+         if (largeObjectManager != null) {
+            ((LargeObjectManager) largeObjectManager).delete(oid);
+         }
+      }
+
+   }
+
    public Object open(Connection connection, long oid, int mode) throws 
SQLException {
       if (shouldUseReflection) {
          Object largeObjectManager = getLargeObjectManager(connection);
diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
index 7bfadcd780..e4e60222a3 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.jdbc.store.file;
 
+import java.lang.invoke.MethodHandles;
 import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -25,10 +26,14 @@ import java.sql.Statement;
 
 import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("SynchronizeOnNonFinalField")
 public final class PostgresSequentialSequentialFileDriver extends 
JDBCSequentialFileFactoryDriver {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
    private PostgresLargeObjectManager largeObjectManager;
 
@@ -62,6 +67,11 @@ public final class PostgresSequentialSequentialFileDriver 
extends JDBCSequential
          try (PreparedStatement createFile = 
connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys)) 
{
             connection.setAutoCommit(false);
             Long oid = largeObjectManager.createLO(connection);
+            file.setOid(oid);
+
+            if (logger.isDebugEnabled()) {
+               logger.debug("Creating file {} with oid={}", 
file.getFileName(), oid);
+            }
 
             createFile.setString(1, file.getFileName());
             createFile.setString(2, file.getExtension());
@@ -89,7 +99,7 @@ public final class PostgresSequentialSequentialFileDriver 
extends JDBCSequential
 
             try (ResultSet rs = readLargeObject.executeQuery()) {
                if (rs.next()) {
-                  file.setWritePosition(getPostGresLargeObjectSize(file));
+                  file.setWritePosition(getPostgresLargeObjectSize(file));
                }
                connection.commit();
             } catch (SQLException e) {
@@ -152,6 +162,9 @@ public final class PostgresSequentialSequentialFileDriver 
extends JDBCSequential
    }
 
    private Long getOID(JDBCSequentialFile file) throws SQLException {
+      if (file.getOid() != -1L) {
+         return file.getOid();
+      }
       Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
       if (oid == null) {
          try (Connection connection = connectionProvider.getConnection()) {
@@ -170,13 +183,14 @@ public final class PostgresSequentialSequentialFileDriver 
extends JDBCSequential
             }
          }
       }
-      if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
-         System.out.println("FD");
+      Long value = (Long) file.getMetaData(POSTGRES_OID_KEY);
+      if (value != null) {
+         file.setOid(value);
       }
-      return (Long) file.getMetaData(POSTGRES_OID_KEY);
+      return value;
    }
 
-   private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws 
SQLException {
+   private int getPostgresLargeObjectSize(JDBCSequentialFile file) throws 
SQLException {
       int size = 0;
       Long oid = getOID(file);
       if (oid != null) {
@@ -195,4 +209,22 @@ public final class PostgresSequentialSequentialFileDriver 
extends JDBCSequential
       }
       return size;
    }
+
+   @Override
+   public void deleteFile(JDBCSequentialFile file) throws SQLException {
+      Long oid = getOID(file);
+      if (logger.isDebugEnabled()) {
+         logger.debug("Deleting file {} with oid={}", file.getFileName(), oid);
+      }
+      if (oid != null) {
+         try (Connection connection = connectionProvider.getConnection()) {
+            largeObjectManager.deleteLO(connection, oid);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+      }
+      super.deleteFile(file);
+   }
+
+
 }
diff --git 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/ParameterDBTestBase.java
 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/ParameterDBTestBase.java
index d6c65094f6..d572c25c29 100644
--- 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/ParameterDBTestBase.java
+++ 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/ParameterDBTestBase.java
@@ -28,11 +28,13 @@ import 
org.apache.activemq.artemis.core.config.Configuration;
 import 
org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
 import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
 import 
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.activemq.artemis.utils.Wait;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 @ExtendWith(ParameterizedTestExtension.class)
 public abstract class ParameterDBTestBase extends DBTestBase {
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -47,6 +49,27 @@ public abstract class ParameterDBTestBase extends DBTestBase 
{
       return database.getJdbcURI();
    }
 
+   int postgres_loCount;
+
+   protected int countPostgresLargeObjects() {
+      try (java.sql.Connection jdbcConnection = database.getConnection()) {
+         ResultSet resultSet = 
jdbcConnection.createStatement().executeQuery("select count(*) from 
pg_largeobject");
+         resultSet.next();
+         return resultSet.getInt(1);
+      } catch (Exception e) {
+         // not throwing a SQLException as this method is used on 
Supplier<String>
+         // however if an exception still happened, the test should receive an 
error
+         throw new RuntimeException(e.getMessage(), e);
+      }
+   }
+   public void prepareCheckPostgres() throws Exception {
+      postgres_loCount = countPostgresLargeObjects();
+   }
+
+   public void checkPostgres() throws Exception {
+      Wait.assertTrue(() -> "There are still " + countPostgresLargeObjects() + 
" pg_largeObject records", () -> countPostgresLargeObjects() - postgres_loCount 
< 10, 60_000, 100);
+   }
+
 
    @Parameter(index = 0)
    public Database database;
@@ -99,6 +122,15 @@ public abstract class ParameterDBTestBase extends 
DBTestBase {
    }
 
    public int dropDatabase() {
+      if (database == Database.POSTGRES) {
+         try (Connection connection = getConnection()) {
+            logger.info("Removing large objects on postgres");
+            connection.createStatement().execute("DELETE FROM pg_largeobject");
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+      }
+
       return switch (database) {
          case JOURNAL -> 0;
          case DERBY -> {
diff --git 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
index 5dbb78be0d..f3cda9b281 100644
--- 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
+++ 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
@@ -89,6 +89,9 @@ public class RealServerDatabaseLargeMessageTest extends 
ParameterDBTestBase {
    }
 
    public void testLargeMessage(String protocol) throws Exception {
+      if (database == Database.POSTGRES) {
+         prepareCheckPostgres();
+      }
       logger.info("testLargeMessage({})", protocol);
       final String queueName = "QUEUE_" + RandomUtil.randomUUIDString() + "_" 
+ protocol + "_" + database;
 
@@ -157,6 +160,10 @@ public class RealServerDatabaseLargeMessageTest extends 
ParameterDBTestBase {
       assertTrue(done.await(120, TimeUnit.SECONDS));
       assertEquals(0, errors.get());
 
+      if (database == Database.POSTGRES) {
+         checkPostgres();
+      }
+
    }
 
 }
diff --git 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
index 0f65518cd1..d2a1535dca 100644
--- 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
+++ 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
@@ -170,9 +170,9 @@ public class PagingTest extends ParameterDBTestBase {
       List<Object[]> databases = new ArrayList<>();
       databases.add(new Object[] {Database.JOURNAL, true});
       databases.add(new Object[] {Database.JOURNAL, false});
-      Database database = Database.randomDB();
-      if (database != null) {
-         databases.add(new Object[]{database, false});
+      List<Database> selectedList = Database.selectedList();
+      if (selectedList != null && !selectedList.isEmpty()) {
+         selectedList.forEach(d -> databases.add(new Object[]{d, false}));
       }
       return databases;
    }
diff --git 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
index 6ff002bb1d..9389e5f018 100644
--- 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
+++ 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
@@ -54,11 +54,13 @@ public class RealServerDatabasePagingTest extends 
ParameterDBTestBase {
 
    private static final String TEST_NAME = "PGDB";
 
-   private static final int MAX_MESSAGES = 
Integer.parseInt(testProperty(TEST_NAME, "MAX_MESSAGES", "200"));
+   private static final int MAX_MESSAGES = 
Integer.parseInt(testProperty(TEST_NAME, "MAX_MESSAGES", "1000"));
+   private static final int MAX_LARGE_MESSAGES = 
Integer.parseInt(testProperty(TEST_NAME, "MAX_LARGE_MESSAGES", "200"));
 
    private static final int SOAK_MAX_MESSAGES = 
Integer.parseInt(testProperty(TEST_NAME, "SOAK_MAX_MESSAGES", "100000"));
 
    private static final int MESSAGE_SIZE = 
Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "1000"));
+   private static final int LARGE_MESSAGE_SIZE = 
Integer.parseInt(testProperty(TEST_NAME, "LARGE_MESSAGE_SIZE", "500000"));
    private static final int SOAK_MESSAGE_SIZE = 
Integer.parseInt(testProperty(TEST_NAME, "SOAK_MESSAGE_SIZE", "1000"));
 
    private static final int COMMIT_INTERVAL = 
Integer.parseInt(testProperty(TEST_NAME, "COMMIT_INTERVAL", "1000"));
@@ -85,6 +87,12 @@ public class RealServerDatabasePagingTest extends 
ParameterDBTestBase {
    }
 
 
+   @TestTemplate
+   public void testPagingWithLargeMessages() throws Exception {
+      testPaging("CORE", MAX_LARGE_MESSAGES, LARGE_MESSAGE_SIZE);
+      testPaging("AMQP", MAX_LARGE_MESSAGES, LARGE_MESSAGE_SIZE);
+   }
+
    @TestTemplate
    public void testSoakPaging() throws Exception {
       testPaging("AMQP", SOAK_MAX_MESSAGES, SOAK_MESSAGE_SIZE);
@@ -93,6 +101,10 @@ public class RealServerDatabasePagingTest extends 
ParameterDBTestBase {
    private void testPaging(String protocol, int messages, int messageSize) 
throws Exception {
       logger.info("performing paging test on protocol={} and db={}", protocol, 
database);
 
+      if (database == Database.POSTGRES) {
+         prepareCheckPostgres();
+      }
+
       final String queueName = "QUEUE_" + RandomUtil.randomUUIDString() + "_" 
+ protocol + "_" + database;
 
       ConnectionFactory connectionFactory = 
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
@@ -144,6 +156,10 @@ public class RealServerDatabasePagingTest extends 
ParameterDBTestBase {
          assertNull(consumer.receiveNoWait());
       }
 
+      if (database == Database.POSTGRES) {
+         checkPostgres();
+      }
+
 
    }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to