This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c5b81d929d ARTEMIS-4682 Improving performance of reloading JDBC storage
c5b81d929d is described below
commit c5b81d929dcfa4e7bdee98e1ca8f97b48ecbb396
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Mar 11 17:09:20 2024 -0400
ARTEMIS-4682 Improving performance of reloading JDBC storage
- fixing leaks
- page.exists() would lead to a select(all) and find.
---
.../jdbc/store/file/JDBCSequentialFile.java | 28 ++++++----------
.../jdbc/store/file/JDBCSequentialFileFactory.java | 29 +++++-----------
.../file/JDBCSequentialFileFactoryDriver.java | 4 +--
.../db/paging/RealServerDatabasePagingTest.java | 39 ++++++++++++++++------
4 files changed, 50 insertions(+), 50 deletions(-)
diff --git
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index a8f2b763c8..5289313ba3 100644
---
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -67,8 +67,6 @@ public class JDBCSequentialFile implements SequentialFile {
private final JDBCSequentialFileFactory fileFactory;
- private final Object writeLock;
-
private final JDBCSequentialFileFactoryDriver dbDriver;
MpscUnboundedArrayQueue<ScheduledWrite> writeQueue = new
MpscUnboundedArrayQueue<>(8192);
@@ -89,13 +87,11 @@ public class JDBCSequentialFile implements SequentialFile {
final Executor executor,
final ScheduledExecutorService scheduledExecutorService,
final long syncDelay,
- final JDBCSequentialFileFactoryDriver driver,
- final Object writeLock) throws SQLException {
+ final JDBCSequentialFileFactoryDriver driver) throws
SQLException {
this.fileFactory = fileFactory;
this.filename = filename;
this.extension = filename.contains(".") ?
filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : "";
this.executor = executor;
- this.writeLock = writeLock;
this.dbDriver = driver;
this.scheduledExecutorService = scheduledExecutorService;
this.syncDelay = syncDelay;
@@ -113,14 +109,10 @@ public class JDBCSequentialFile implements SequentialFile
{
@Override
public boolean exists() {
- if (isLoaded.get()) return true;
try {
- return fileFactory.listFiles(extension).contains(filename);
- } catch (Exception e) {
- logger.debug(e.getMessage(), e);
- // this shouldn't throw a critical IO Error
- // as if the destination does not exists (ot table store removed),
the table will not exist and
- // we may get a SQL Exception
+ return dbDriver.getFileID(this) >= 0;
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
return false;
}
}
@@ -178,7 +170,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public void delete() throws IOException, InterruptedException,
ActiveMQException {
try {
- synchronized (writeLock) {
+ synchronized (this) {
if (load()) {
dbDriver.deleteFile(this);
}
@@ -193,7 +185,7 @@ public class JDBCSequentialFile implements SequentialFile {
private synchronized int jdbcWrite(byte[] data, IOCallback callback,
boolean append) {
try {
logger.debug("Writing {} bytes into {}", data.length, filename);
- synchronized (writeLock) {
+ synchronized (this) {
int noBytes = dbDriver.writeToFile(this, data, append);
seek(append ? writePosition + noBytes : noBytes);
if (logger.isTraceEnabled()) {
@@ -360,7 +352,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public synchronized int read(ByteBuffer bytes, final IOCallback callback)
throws SQLException {
- synchronized (writeLock) {
+ synchronized (this) {
try {
int read = dbDriver.readFromFile(this, bytes);
readPosition += read;
@@ -439,7 +431,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public void renameTo(String newFileName) throws Exception {
- synchronized (writeLock) {
+ synchronized (this) {
try {
dbDriver.renameFile(this, newFileName);
} catch (SQLException e) {
@@ -451,7 +443,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public SequentialFile cloneFile() {
try {
- JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory,
filename, executor, scheduledExecutorService, syncDelay, dbDriver, writeLock);
+ JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory,
filename, executor, scheduledExecutorService, syncDelay, dbDriver);
clone.setWritePosition(this.writePosition);
return clone;
} catch (Exception e) {
@@ -464,7 +456,7 @@ public class JDBCSequentialFile implements SequentialFile {
public void copyTo(SequentialFile cloneFile) throws Exception {
JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
try {
- synchronized (writeLock) {
+ synchronized (this) {
logger.trace("JDBC Copying File. From: {} To: {}", this,
cloneFile);
clone.open();
dbDriver.copyFileData(this, clone);
diff --git
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 77791f59f1..899924adc4 100644
---
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -20,9 +20,6 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@@ -32,11 +29,11 @@ import
org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveMQComponent {
@@ -44,14 +41,14 @@ public class JDBCSequentialFileFactory implements
SequentialFileFactory, ActiveM
private boolean started;
- private final Set<JDBCSequentialFile> files = new ConcurrentHashSet<>();
-
private final Executor executor;
- private final Map<String, Object> fileLocks = new ConcurrentHashMap<>();
-
private JDBCSequentialFileFactoryDriver dbDriver;
+ private volatile int countOpen = 0;
+
+ private static final AtomicIntegerFieldUpdater<JDBCSequentialFileFactory>
countOpenUpdater =
AtomicIntegerFieldUpdater.newUpdater(JDBCSequentialFileFactory.class,
"countOpen");
+
private final IOCriticalErrorListener criticalErrorListener;
private final long syncDelay;
@@ -133,9 +130,8 @@ public class JDBCSequentialFileFactory implements
SequentialFileFactory, ActiveM
@Override
public SequentialFile createSequentialFile(String fileName) {
try {
- fileLocks.putIfAbsent(fileName, new Object());
- JDBCSequentialFile file = new JDBCSequentialFile(this, fileName,
executor, scheduledExecutorService, syncDelay, dbDriver,
fileLocks.get(fileName));
- files.add(file);
+ JDBCSequentialFile file = new JDBCSequentialFile(this, fileName,
executor, scheduledExecutorService, syncDelay, dbDriver);
+ countOpenUpdater.incrementAndGet(this);
return file;
} catch (Exception e) {
criticalErrorListener.onIOException(e, "Error whilst creating JDBC
file", null);
@@ -144,11 +140,11 @@ public class JDBCSequentialFileFactory implements
SequentialFileFactory, ActiveM
}
public void sequentialFileClosed(SequentialFile file) {
- files.remove(file);
+ countOpenUpdater.decrementAndGet(this);
}
public int getNumberOfOpenFiles() {
- return files.size();
+ return countOpenUpdater.get(this);
}
@Override
@@ -261,13 +257,6 @@ public class JDBCSequentialFileFactory implements
SequentialFileFactory, ActiveM
@Override
public void flush() {
- for (SequentialFile file : files) {
- try {
- file.sync();
- } catch (Exception e) {
- criticalErrorListener.onIOException(e, "Error during JDBC file
sync.", file.getFileName());
- }
- }
}
public synchronized void destroy() throws SQLException {
diff --git
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
index 05726feabc..c0e32a11e3 100644
---
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
+++
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
@@ -106,7 +106,7 @@ public class JDBCSequentialFileFactoryDriver extends
AbstractJDBCDriver {
* @throws SQLException
*/
public void openFile(JDBCSequentialFile file) throws SQLException {
- final long fileId = fileExists(file);
+ final long fileId = getFileID(file);
if (fileId < 0) {
createFile(file);
} else {
@@ -126,7 +126,7 @@ public class JDBCSequentialFileFactoryDriver extends
AbstractJDBCDriver {
* @return
* @throws SQLException
*/
- public long fileExists(JDBCSequentialFile file) throws SQLException {
+ public long getFileID(JDBCSequentialFile file) throws SQLException {
try (Connection connection = connectionProvider.getConnection()) {
try (PreparedStatement selectFileByFileName =
connection.prepareStatement(this.selectFileByFileName)) {
connection.setAutoCommit(false);
diff --git
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
index 65a42ad01a..fefcbed1ea 100644
---
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
+++
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java
@@ -49,9 +49,12 @@ public class RealServerDatabasePagingTest extends
ParameterDBTestBase {
private static final int MAX_MESSAGES =
Integer.parseInt(testProperty(TEST_NAME, "MAX_MESSAGES", "200"));
+ private static final int SOAK_MAX_MESSAGES =
Integer.parseInt(testProperty(TEST_NAME, "SOAK_MAX_MESSAGES", "100000"));
+
private static final int MESSAGE_SIZE =
Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "1000"));
+ private static final int SOAK_MESSAGE_SIZE =
Integer.parseInt(testProperty(TEST_NAME, "SOAK_MESSAGE_SIZE", "1000"));
- private static final int COMMIT_INTERVAL =
Integer.parseInt(testProperty(TEST_NAME, "COMMIT_INTERVAL", "100"));
+ private static final int COMMIT_INTERVAL =
Integer.parseInt(testProperty(TEST_NAME, "COMMIT_INTERVAL", "1000"));
Process serverProcess;
@@ -69,29 +72,37 @@ public class RealServerDatabasePagingTest extends
ParameterDBTestBase {
@Test
public void testPaging() throws Exception {
- testPaging("CORE");
- testPaging("AMQP");
- testPaging("OPENWIRE");
+ testPaging("CORE", MAX_MESSAGES, MESSAGE_SIZE);
+ testPaging("AMQP", MAX_MESSAGES, MESSAGE_SIZE);
+ testPaging("OPENWIRE", MAX_MESSAGES, MESSAGE_SIZE);
+ }
+
+
+ @Test
+ public void testSoakPaging() throws Exception {
+ testPaging("AMQP", SOAK_MAX_MESSAGES, SOAK_MESSAGE_SIZE);
}
- public void testPaging(String protocol) throws Exception {
+ private void testPaging(String protocol, int messages, int messageSize)
throws Exception {
logger.info("performing paging test on protocol={} and db={}", protocol,
database);
final String queueName = "QUEUE_" + RandomUtil.randomString() + "_" +
protocol + "_" + database;
ConnectionFactory connectionFactory =
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+ byte[] messageLoad = RandomUtil.randomBytes(messageSize);
+
try (Connection connection = connectionFactory.createConnection()) {
- byte[] messageLoad = new byte[MESSAGE_SIZE];
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
- for (int i = 0; i < MAX_MESSAGES; i++) {
+ for (int i = 0; i < messages; i++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(messageLoad);
message.setIntProperty("i", i);
producer.send(message);
if (i % COMMIT_INTERVAL == 0) {
+ logger.info("Sent {} messages", i);
session.commit();
}
}
@@ -107,15 +118,23 @@ public class RealServerDatabasePagingTest extends
ParameterDBTestBase {
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
- for (int i = 0; i < MAX_MESSAGES; i++) {
+ for (int i = 0; i < messages; i++) {
BytesMessage message = (BytesMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getIntProperty("i"));
- Assert.assertEquals(MESSAGE_SIZE, message.getBodyLength());
+ Assert.assertEquals(messageSize, message.getBodyLength());
+ byte[] bytesOutput = new byte[(int)message.getBodyLength()];
+ message.readBytes(bytesOutput);
+ Assert.assertArrayEquals(messageLoad, bytesOutput);
+ if (i % COMMIT_INTERVAL == 0) {
+ logger.info("Received {}", i);
+ session.commit();
+ }
}
+ session.commit();
Assert.assertNull(consumer.receiveNoWait());
}