This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new d5a76a27cc ARTEMIS-5829 Cleanup Postgres pg_largeobjects
d5a76a27cc is described below
commit d5a76a27ccd4d04950bfba9460f0fc4d813964e2
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Dec 18 10:03:10 2025 -0500
ARTEMIS-5829 Cleanup Postgres pg_largeobjects
When paging or large message is used,
pg_largeobjects should be cleared.
this is a feature from Postgres that the FileDriver is using on
postgresql
---
.../jdbc/store/file/JDBCSequentialFile.java | 17 +++++++++
.../store/file/PostgresLargeObjectManager.java | 18 ++++++++++
.../PostgresSequentialSequentialFileDriver.java | 42 +++++++++++++++++++---
.../tests/db/common/ParameterDBTestBase.java | 32 +++++++++++++++++
.../RealServerDatabaseLargeMessageTest.java | 7 ++++
.../artemis/tests/db/paging/PagingTest.java | 6 ++--
.../db/paging/RealServerDatabasePagingTest.java | 18 +++++++++-
7 files changed, 131 insertions(+), 9 deletions(-)
diff --git
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 6af1308ff4..f06c5f2d21 100644
---
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -58,8 +58,13 @@ public class JDBCSequentialFile implements SequentialFile {
private AtomicBoolean isLoaded = new AtomicBoolean(false);
+ private volatile boolean deleted = false;
+
private long id = -1;
+ // used in postgres
+ private long oid = -1;
+
private long readPosition = 0;
private long writePosition = 0;
@@ -172,9 +177,13 @@ public class JDBCSequentialFile implements SequentialFile {
public void delete() throws IOException, InterruptedException,
ActiveMQException {
try {
synchronized (this) {
+ if (deleted) {
+ return;
+ }
if (load()) {
dbDriver.deleteFile(this);
}
+ deleted = true;
}
} catch (SQLException e) {
// file is already gone from a drop somewhere
@@ -473,6 +482,14 @@ public class JDBCSequentialFile implements SequentialFile {
return id;
}
+ public long getOid() {
+ return oid;
+ }
+
+ public void setOid(long oid) {
+ this.oid = oid;
+ }
+
public void setId(long id) {
this.id = id;
}
diff --git
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
index fa5e67793d..c81da2b28c 100644
---
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
+++
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;
+import org.postgresql.largeobject.LargeObjectManager;
/**
* Helper class for when the postresql driver is not directly availalbe.
@@ -70,6 +71,23 @@ public class PostgresLargeObjectManager {
}
}
+ public final void deleteLO(Connection connection, long oid) throws
SQLException {
+ Object largeObjectManager = getLargeObjectManager(connection);
+ if (shouldUseReflection) {
+ try {
+ Method method = largeObjectManager.getClass().getMethod("delete",
long.class);
+ method.invoke(largeObjectManager, oid);
+ } catch (Exception ex) {
+ throw new SQLException("Couldn't access
org.postgresql.largeobject.LargeObjectManager", ex);
+ }
+ } else {
+ if (largeObjectManager != null) {
+ ((LargeObjectManager) largeObjectManager).delete(oid);
+ }
+ }
+
+ }
+
public Object open(Connection connection, long oid, int mode) throws
SQLException {
if (shouldUseReflection) {
Object largeObjectManager = getLargeObjectManager(connection);
diff --git
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
index 7bfadcd780..e4e60222a3 100644
---
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
+++
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.jdbc.store.file;
+import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -25,10 +26,14 @@ import java.sql.Statement;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings("SynchronizeOnNonFinalField")
public final class PostgresSequentialSequentialFileDriver extends
JDBCSequentialFileFactoryDriver {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
private PostgresLargeObjectManager largeObjectManager;
@@ -62,6 +67,11 @@ public final class PostgresSequentialSequentialFileDriver
extends JDBCSequential
try (PreparedStatement createFile =
connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys))
{
connection.setAutoCommit(false);
Long oid = largeObjectManager.createLO(connection);
+ file.setOid(oid);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Creating file {} with oid={}",
file.getFileName(), oid);
+ }
createFile.setString(1, file.getFileName());
createFile.setString(2, file.getExtension());
@@ -89,7 +99,7 @@ public final class PostgresSequentialSequentialFileDriver
extends JDBCSequential
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
- file.setWritePosition(getPostGresLargeObjectSize(file));
+ file.setWritePosition(getPostgresLargeObjectSize(file));
}
connection.commit();
} catch (SQLException e) {
@@ -152,6 +162,9 @@ public final class PostgresSequentialSequentialFileDriver
extends JDBCSequential
}
private Long getOID(JDBCSequentialFile file) throws SQLException {
+ if (file.getOid() != -1L) {
+ return file.getOid();
+ }
Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
if (oid == null) {
try (Connection connection = connectionProvider.getConnection()) {
@@ -170,13 +183,14 @@ public final class PostgresSequentialSequentialFileDriver
extends JDBCSequential
}
}
}
- if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
- System.out.println("FD");
+ Long value = (Long) file.getMetaData(POSTGRES_OID_KEY);
+ if (value != null) {
+ file.setOid(value);
}
- return (Long) file.getMetaData(POSTGRES_OID_KEY);
+ return value;
}
- private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws
SQLException {
+ private int getPostgresLargeObjectSize(JDBCSequentialFile file) throws
SQLException {
int size = 0;
Long oid = getOID(file);
if (oid != null) {
@@ -195,4 +209,22 @@ public final class PostgresSequentialSequentialFileDriver
extends JDBCSequential
}
return size;
}
+
+ @Override
+ public void deleteFile(JDBCSequentialFile file) throws SQLException {
+ Long oid = getOID(file);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Deleting file {} with oid={}", file.getFileName(), oid);
+ }
+ if (oid != null) {
+ try (Connection connection = connectionProvider.getConnection()) {
+ largeObjectManager.deleteLO(connection, oid);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ super.deleteFile(file);
+ }
+
+
}
diff --git
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/ParameterDBTestBase.java
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/ParameterDBTestBase.java
index d6c65094f6..d572c25c29 100644
---
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/ParameterDBTestBase.java
+++
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/ParameterDBTestBase.java
@@ -28,11 +28,13 @@ import
org.apache.activemq.artemis.core.config.Configuration;
import
org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
import
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
@ExtendWith(ParameterizedTestExtension.class)
public abstract class ParameterDBTestBase extends DBTestBase {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -47,6 +49,27 @@ public abstract class ParameterDBTestBase extends DBTestBase
{
return database.getJdbcURI();
}
+ int postgres_loCount;
+
+ protected int countPostgresLargeObjects() {
+ try (java.sql.Connection jdbcConnection = database.getConnection()) {
+ ResultSet resultSet =
jdbcConnection.createStatement().executeQuery("select count(*) from
pg_largeobject");
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (Exception e) {
+ // not throwing a SQLException as this method is used on
Supplier<String>
+ // however if an exception still happened, the test should receive an
error
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ public void prepareCheckPostgres() throws Exception {
+ postgres_loCount = countPostgresLargeObjects();
+ }
+
+ public void checkPostgres() throws Exception {
+ Wait.assertTrue(() -> "There are still " + countPostgresLargeObjects() +
" pg_largeObject records", () -> countPostgresLargeObjects() - postgres_loCount
< 10, 60_000, 100);
+ }
+
@Parameter(index = 0)
public Database database;
@@ -99,6 +122,15 @@ public abstract class ParameterDBTestBase extends
DBTestBase {
}
public int dropDatabase() {
+ if (database == Database.POSTGRES) {
+ try (Connection connection = getConnection()) {
+ logger.info("Removing large objects on postgres");
+ connection.createStatement().execute("DELETE FROM pg_largeobject");
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
return switch (database) {
case JOURNAL -> 0;
case DERBY -> {
diff --git
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
index 5dbb78be0d..f3cda9b281 100644
---
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
+++
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java
@@ -89,6 +89,9 @@ public class RealServerDatabaseLargeMessageTest extends
ParameterDBTestBase {
}
public void testLargeMessage(String protocol) throws Exception {
+ if (database == Database.POSTGRES) {
+ prepareCheckPostgres();
+ }
logger.info("testLargeMessage({})", protocol);
final String queueName = "QUEUE_" + RandomUtil.randomUUIDString() + "_"
+ protocol + "_" + database;
@@ -157,6 +160,10 @@ public class RealServerDatabaseLargeMessageTest extends
ParameterDBTestBase {
assertTrue(done.await(120, TimeUnit.SECONDS));
assertEquals(0, errors.get());
+ if (database == Database.POSTGRES) {
+ checkPostgres();
+ }
+
}
}
diff --git
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
index 0f65518cd1..d2a1535dca 100644
---
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
+++
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
@@ -170,9 +170,9 @@ public class PagingTest extends ParameterDBTestBase {
List<Object[]> databases = new ArrayList<>();
databases.add(new Object[] {Database.JOURNAL, true});
databases.add(new Object[] {Database.JOURNAL, false});
- Database database = Database.randomDB();
- if (database != null) {
- databases.add(new Object[]{database, false});
+ List<Database> selectedList = Database.selectedList();
+ if (selectedList != null && !selectedList.isEmpty()) {
+ selectedList.forEach(d -> databases.add(new Object[]{d, false}));
}
return databases;
}
diff --git
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
index 6ff002bb1d..9389e5f018 100644
---
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
+++
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
@@ -54,11 +54,13 @@ public class RealServerDatabasePagingTest extends
ParameterDBTestBase {
private static final String TEST_NAME = "PGDB";
- private static final int MAX_MESSAGES =
Integer.parseInt(testProperty(TEST_NAME, "MAX_MESSAGES", "200"));
+ private static final int MAX_MESSAGES =
Integer.parseInt(testProperty(TEST_NAME, "MAX_MESSAGES", "1000"));
+ private static final int MAX_LARGE_MESSAGES =
Integer.parseInt(testProperty(TEST_NAME, "MAX_LARGE_MESSAGES", "200"));
private static final int SOAK_MAX_MESSAGES =
Integer.parseInt(testProperty(TEST_NAME, "SOAK_MAX_MESSAGES", "100000"));
private static final int MESSAGE_SIZE =
Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "1000"));
+ private static final int LARGE_MESSAGE_SIZE =
Integer.parseInt(testProperty(TEST_NAME, "LARGE_MESSAGE_SIZE", "500000"));
private static final int SOAK_MESSAGE_SIZE =
Integer.parseInt(testProperty(TEST_NAME, "SOAK_MESSAGE_SIZE", "1000"));
private static final int COMMIT_INTERVAL =
Integer.parseInt(testProperty(TEST_NAME, "COMMIT_INTERVAL", "1000"));
@@ -85,6 +87,12 @@ public class RealServerDatabasePagingTest extends
ParameterDBTestBase {
}
+ @TestTemplate
+ public void testPagingWithLargeMessages() throws Exception {
+ testPaging("CORE", MAX_LARGE_MESSAGES, LARGE_MESSAGE_SIZE);
+ testPaging("AMQP", MAX_LARGE_MESSAGES, LARGE_MESSAGE_SIZE);
+ }
+
@TestTemplate
public void testSoakPaging() throws Exception {
testPaging("AMQP", SOAK_MAX_MESSAGES, SOAK_MESSAGE_SIZE);
@@ -93,6 +101,10 @@ public class RealServerDatabasePagingTest extends
ParameterDBTestBase {
private void testPaging(String protocol, int messages, int messageSize)
throws Exception {
logger.info("performing paging test on protocol={} and db={}", protocol,
database);
+ if (database == Database.POSTGRES) {
+ prepareCheckPostgres();
+ }
+
final String queueName = "QUEUE_" + RandomUtil.randomUUIDString() + "_"
+ protocol + "_" + database;
ConnectionFactory connectionFactory =
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
@@ -144,6 +156,10 @@ public class RealServerDatabasePagingTest extends
ParameterDBTestBase {
assertNull(consumer.receiveNoWait());
}
+ if (database == Database.POSTGRES) {
+ checkPostgres();
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]