Repository: activemq-artemis Updated Branches: refs/heads/1.x c35960f6a -> ec085b8ea
ARTEMIS-1115 Call CriticalIOListener on JDBC Error Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2ccc4e14 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2ccc4e14 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2ccc4e14 Branch: refs/heads/1.x Commit: 2ccc4e14f1ed55f98a30c051dbb057ad35a005a2 Parents: c35960f Author: Martyn Taylor <[email protected]> Authored: Mon Apr 17 10:40:26 2017 +0100 Committer: Clebert Suconic <[email protected]> Committed: Wed Apr 19 00:35:48 2017 -0400 ---------------------------------------------------------------------- .../jdbc/store/file/JDBCSequentialFile.java | 80 +++++++++++++------- .../store/file/JDBCSequentialFileFactory.java | 70 +++++++++++++---- .../file/JDBCSequentialFileFactoryDriver.java | 8 +- .../jdbc/store/journal/JDBCJournalImpl.java | 75 +++++++++--------- .../jdbc/store/journal/JDBCJournalRecord.java | 6 +- .../file/JDBCSequentialFileFactoryTest.java | 7 +- .../paging/impl/PagingStoreFactoryDatabase.java | 7 +- .../impl/journal/JDBCJournalStorageManager.java | 12 +-- .../jdbc/store/journal/JDBCJournalTest.java | 9 ++- 9 files changed, 187 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ccc4e14/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java index 7e72785..e2da151 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -93,18 +93,23 @@ public class JDBCSequentialFile implements SequentialFile { return fileFactory.listFiles(extension).contains(filename); } catch (Exception e) { logger.warn(e.getMessage(), e); + fileFactory.onIOError(e, "Error checking JDBC file exists.", this); return false; } } @Override public synchronized void open() throws Exception { - if (!isOpen) { - synchronized (writeLock) { - dbDriver.openFile(this); - isCreated = true; - isOpen = true; + try { + if (!isOpen) { + synchronized (writeLock) { + dbDriver.openFile(this); + isCreated = true; + isOpen = true; + } } + } catch (SQLException e) { + fileFactory.onIOError(e, "Error attempting to open JDBC file.", this); } } @@ -142,34 +147,35 @@ public class JDBCSequentialFile implements SequentialFile { } } } catch (SQLException e) { - throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e); + fileFactory.onIOError(e, "Error deleting JDBC file.", this); } } - private synchronized int internalWrite(byte[] data, IOCallback callback) { + private synchronized int internalWrite(byte[] data, IOCallback callback) throws Exception { try { synchronized (writeLock) { int noBytes = dbDriver.writeToFile(this, data); seek(noBytes); + System.out.println("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size()); if (callback != null) callback.done(); return noBytes; } } catch (Exception e) { - logger.warn("Failed to write to file", e.getMessage(), e); if (callback != null) - callback.onError(-1, e.getMessage()); + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); + fileFactory.onIOError(e, "Error writing to JDBC file.", this); } - return -1; + return 0; } - public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) { + public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) throws Exception { byte[] data = new byte[buffer.readableBytes()]; buffer.readBytes(data); return internalWrite(data, callback); } - private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) { + private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) throws Exception { return internalWrite(buffer.array(), callback); } @@ -177,16 +183,27 @@ public class JDBCSequentialFile implements SequentialFile { executor.execute(new Runnable() { @Override public void run() { - internalWrite(bytes, callback); + try { + internalWrite(bytes, callback); + } catch (Exception e) { + logger.error(e); + // internalWrite will notify the CriticalIOErrorListener + } } }); } private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) { + final SequentialFile file = this; executor.execute(new Runnable() { @Override public void run() { - internalWrite(bytes, callback); + try { + internalWrite(bytes, callback); + } catch (Exception e) { + logger.error(e); + fileFactory.onIOError(e, "Error on JDBC file sync", file); + } } }); } @@ -226,7 +243,8 @@ public class JDBCSequentialFile implements SequentialFile { scheduleWrite(bytes, waitIOCallback); waitIOCallback.waitCompletion(); } catch (Exception e) { - waitIOCallback.onError(-1, e.getMessage()); + waitIOCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error writing to JDBC file."); + fileFactory.onIOError(e, "Failed to write to file.", this); } } else { scheduleWrite(bytes, callback); @@ -249,12 +267,12 @@ public class JDBCSequentialFile implements SequentialFile { if (callback != null) callback.done(); return read; - } catch (Exception e) { + } catch (SQLException e) { if (callback != null) - callback.onError(-1, e.getMessage()); - logger.warn("Failed to read from file", e.getMessage(), e); - return 0; + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); + fileFactory.onIOError(e, "Error reading from JDBC file.", this); } + return 0; } } @@ -291,7 +309,8 @@ public class JDBCSequentialFile implements SequentialFile { try { callback.waitCompletion(); } catch (Exception e) { - throw new IOException(e); + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "Error during JDBC file sync."); + fileFactory.onIOError(e, "Error during JDBC file sync.", this); } } @@ -303,7 +322,11 @@ public class JDBCSequentialFile implements SequentialFile { @Override public void renameTo(String newFileName) throws Exception { synchronized (writeLock) { - dbDriver.renameFile(this, newFileName); + try { + dbDriver.renameFile(this, newFileName); + } catch (SQLException e) { + fileFactory.onIOError(e, "Error renaming JDBC file.", this); + } } } @@ -313,18 +336,21 @@ public class JDBCSequentialFile implements SequentialFile { JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock); return clone; } catch (Exception e) { - logger.error("Error cloning file: " + filename, e); - return null; + fileFactory.onIOError(e, "Error cloning JDBC file.", this); } + return null; } @Override public void copyTo(SequentialFile cloneFile) throws Exception { JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile; - clone.open(); - - synchronized (writeLock) { - dbDriver.copyFileData(this, clone); + try { + synchronized (writeLock) { + clone.open(); + dbDriver.copyFileData(this, clone); + } + } catch (Exception e) { + fileFactory.onIOError(e, "Error copying JDBC file.", this); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ccc4e14/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index fa88a85..d5a92a2 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -27,15 +27,19 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.jboss.logging.Logger; public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { + private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class); + private boolean started; private final List<JDBCSequentialFile> files = new ArrayList<>(); @@ -44,28 +48,53 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM private final Map<String, Object> fileLocks = new HashMap<>(); - private final JDBCSequentialFileFactoryDriver dbDriver; + private JDBCSequentialFileFactoryDriver dbDriver; + + private final IOCriticalErrorListener criticalErrorListener; public JDBCSequentialFileFactory(final DataSource dataSource, final SQLProvider sqlProvider, - Executor executor) throws Exception { + Executor executor, + IOCriticalErrorListener criticalErrorListener) throws Exception { + this.executor = executor; - dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider); + this.criticalErrorListener = criticalErrorListener; + + try { + this.dbDriver = JDBCFileUtils.getDBFileDriver(dataSource, sqlProvider); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); + } + } public JDBCSequentialFileFactory(final String connectionUrl, final String className, final SQLProvider sqlProvider, - Executor executor) throws Exception { + Executor executor, + IOCriticalErrorListener criticalErrorListener) throws Exception { this.executor = executor; - dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); + this.criticalErrorListener = criticalErrorListener; + try { + this.dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); + } + } public JDBCSequentialFileFactory(final Connection connection, final SQLProvider sqlProvider, - final Executor executor) throws Exception { + final Executor executor, + final IOCriticalErrorListener criticalErrorListener) throws Exception { this.executor = executor; - this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider); + this.criticalErrorListener = criticalErrorListener; + + try { + this.dbDriver = JDBCFileUtils.getDBFileDriver(connection, sqlProvider); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Failed to start JDBC Driver", null); + } } public JDBCSequentialFileFactoryDriver getDbDriver() { @@ -74,8 +103,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public SequentialFileFactory setDatasync(boolean enabled) { - - // noop return this; } @@ -92,7 +119,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM started = true; } } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database", e); + criticalErrorListener.onIOException(e, "Unable to start database driver", null); started = false; } } @@ -115,7 +142,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM files.add(file); return file; } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.error("Could not create file", e); + criticalErrorListener.onIOException(e, "Error whilst creating JDBC file", null); } return null; } @@ -127,7 +154,12 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public List<String> listFiles(String extension) throws Exception { - return dbDriver.listFiles(extension); + try { + return dbDriver.listFiles(extension); + } catch (SQLException e) { + criticalErrorListener.onIOException(e, "Error listing JDBC files.", null); + throw e; + } } @Override @@ -137,6 +169,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public void onIOError(Exception exception, String message, SequentialFile file) { + criticalErrorListener.onIOException(exception, message, file); } @Override @@ -215,9 +248,20 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public void flush() { + for (SequentialFile file : files) { + try { + file.sync(); + } catch (Exception e) { + criticalErrorListener.onIOException(e, "Error during JDBC file sync.", file); + } + } } public synchronized void destroy() throws SQLException { - dbDriver.destroy(); + try { + dbDriver.destroy(); + } catch (SQLException e) { + logger.error("Error destroying file factory", e); + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ccc4e14/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java index cf8d39d..a901f6a 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java @@ -29,10 +29,13 @@ import java.util.List; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.jboss.logging.Logger; @SuppressWarnings("SynchronizeOnNonFinalField") public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { + private static final Logger logger = Logger.getLogger(JDBCSequentialFileFactoryDriver.class); + protected PreparedStatement deleteFile; protected PreparedStatement createFile; @@ -157,6 +160,8 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { Blob blob = rs.getBlob(1); if (blob != null) { file.setWritePosition((int) blob.length()); + } else { + logger.warn("ERROR NO BLOB FOR FILE" + "File: " + file.getFileName() + " " + file.getId()); } } connection.commit(); @@ -293,8 +298,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { connection.commit(); return readLength; } catch (Throwable e) { - connection.rollback(); throw e; + } finally { + connection.rollback(); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ccc4e14/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index a548157..64f20a8 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; @@ -82,26 +84,32 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { // Sequence ID for journal records private final AtomicLong seq = new AtomicLong(0); + private final IOCriticalErrorListener criticalIOErrorListener; + public JDBCJournalImpl(DataSource dataSource, SQLProvider provider, String tableName, ScheduledExecutorService scheduledExecutorService, - Executor completeExecutor) { + Executor completeExecutor, + IOCriticalErrorListener criticalIOErrorListener) { super(dataSource, provider); records = new ArrayList<>(); this.scheduledExecutorService = scheduledExecutorService; this.completeExecutor = completeExecutor; + this.criticalIOErrorListener = criticalIOErrorListener; } public JDBCJournalImpl(String jdbcUrl, String jdbcDriverClass, SQLProvider sqlProvider, ScheduledExecutorService scheduledExecutorService, - Executor completeExecutor) { + Executor completeExecutor, + IOCriticalErrorListener criticalIOErrorListener) { super(sqlProvider, jdbcUrl, jdbcDriverClass); records = new ArrayList<>(); this.scheduledExecutorService = scheduledExecutorService; this.completeExecutor = completeExecutor; + this.criticalIOErrorListener = criticalIOErrorListener; } @Override @@ -131,9 +139,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public synchronized void stop() throws SQLException { + public void stop() throws SQLException { + stop(true); + } + + public synchronized void stop(boolean sync) throws SQLException { if (started) { - sync(); + if (sync) + sync(); started = false; super.stop(); } @@ -164,8 +177,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { TransactionHolder holder; - boolean success = false; try { + connection.setAutoCommit(false); + for (JDBCJournalRecord record : recordRef) { switch (record.getRecordType()) { case JDBCJournalRecord.DELETE_RECORD: @@ -195,36 +209,25 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { break; } } - } catch (SQLException e) { - logger.warn(e.getMessage(), e); - executeCallbacks(recordRef, success); - return 0; - } - - try { - connection.setAutoCommit(false); insertJournalRecords.executeBatch(); deleteJournalRecords.executeBatch(); deleteJournalTxRecords.executeBatch(); connection.commit(); - success = true; - } catch (SQLException e) { - logger.warn(e.getMessage(), e); - performRollback(recordRef); - } - try { - if (success) - cleanupTxRecords(deletedRecords, committedTransactions); - } catch (SQLException e) { - logger.warn("Failed to remove the Tx Records", e.getMessage(), e); - } finally { - executeCallbacks(recordRef, success); - } + cleanupTxRecords(deletedRecords, committedTransactions); + executeCallbacks(recordRef, true); + + return recordRef.size(); - return recordRef.size(); + } catch (Exception e) { + criticalIOErrorListener.onIOException(e, "Critical IO Error. Failed to process JDBC Record statements", null); + started = false; + executeCallbacks(recordRef, false); + performRollback(recordRef); + return 0; + } } /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, @@ -260,8 +263,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private void performRollback(List<JDBCJournalRecord> records) { try { - connection.rollback(); - for (JDBCJournalRecord record : records) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { removeTxRecord(record); @@ -277,13 +278,14 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { transactions.remove(txH.transactionID); } } + connection.rollback(); } catch (Exception sqlE) { - logger.warn(sqlE.getMessage(), sqlE); + logger.error(sqlE.getMessage(), sqlE); + criticalIOErrorListener.onIOException(sqlE, sqlE.getMessage(), null); ActiveMQJournalLogger.LOGGER.error("Error performing rollback", sqlE); } } - // TODO Use an executor. private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) { Runnable r = new Runnable() { @Override @@ -297,7 +299,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } private void appendRecord(JDBCJournalRecord record) throws Exception { + record.storeLineUp(); + if (!started) { + if (record.getIoCompletion() != null) { + record.getIoCompletion().onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Journal not started"); + } + } SimpleWaitIOCallback callback = null; if (record.isSync() && record.getIoCompletion() == null) { @@ -316,10 +324,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } syncTimer.delay(); - - if (callback != null) { - callback.waitCompletion(); - } + if (callback != null) callback.waitCompletion(); } private synchronized void addTxRecord(JDBCJournalRecord record) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ccc4e14/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index 9691d3e..8888e02 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -26,6 +26,7 @@ import java.sql.SQLException; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -115,7 +116,7 @@ class JDBCJournalRecord { if (success) { ioCompletion.done(); } else { - ioCompletion.onError(1, "DATABASE TRANSACTION FAILED"); + ioCompletion.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "JDBC Transaction failed."); } } } @@ -126,7 +127,7 @@ class JDBCJournalRecord { } } - void writeRecord(PreparedStatement statement) throws SQLException { + void writeRecord(PreparedStatement statement) throws Exception { byte[] recordBytes = new byte[variableSize]; byte[] txDataBytes = new byte[txDataSize]; @@ -136,6 +137,7 @@ class JDBCJournalRecord { txData.read(txDataBytes); } catch (IOException e) { ActiveMQJournalLogger.LOGGER.error("Error occurred whilst reading Journal Record", e); + throw e; } statement.setLong(1, id); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ccc4e14/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java index b7d0c9d..b04b74f 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; @@ -64,7 +65,11 @@ public class JDBCSequentialFileFactoryTest { String connectionUrl = "jdbc:derby:target/data;create=true"; String tableName = "FILES"; - factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor); + factory = new JDBCSequentialFileFactory(connectionUrl, className, JDBCUtils.getSQLProvider(className, tableName, SQLProvider.DatabaseStoreType.PAGE), executor, new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + } + }); factory.start(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ccc4e14/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java index 4591c8b..b2a3eb5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryDatabase.java @@ -81,6 +81,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { private boolean started = false; + private final IOCriticalErrorListener criticalErrorListener; + public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf, final StorageManager storageManager, final long syncTimeout, @@ -94,6 +96,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { this.scheduledExecutor = scheduledExecutor; this.syncTimeout = syncTimeout; this.dbConf = dbConf; + this.criticalErrorListener = critialErrorListener; start(); } @@ -109,9 +112,11 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { if (sqlProviderFactory == null) { sqlProviderFactory = new GenericSQLProvider.Factory(); } + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); } else { String driverClassName = dbConf.getJdbcDriverClassName(); + pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName(), SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener); pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor()); } pagingFactoryFileFactory.start(); @@ -222,7 +227,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory { sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE); } - return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor()); + return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor(), criticalErrorListener); } private String getTableNameForGUID(String guid) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ccc4e14/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index 8634638..5592c9e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -62,14 +62,14 @@ public class JDBCJournalStorageManager extends JournalStorageManager { if (sqlProviderFactory == null) { sqlProviderFactory = new GenericSQLProvider.Factory(); } - bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor()); - messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor()); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor); + bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener); } else { String driverClassName = dbConf.getJdbcDriverClassName(); - bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor()); - messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor()); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor); + bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener); } largeMessagesFactory.start(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2ccc4e14/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java index ebb5c0e..8f1d25a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java @@ -26,6 +26,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -77,7 +79,12 @@ public class JDBCJournalTest extends ActiveMQTestBase { executorService = Executors.newSingleThreadExecutor(); jdbcUrl = "jdbc:derby:target/data;create=true"; SQLProvider.Factory factory = new DerbySQLProvider.Factory(); - journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService); + journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + + } + }); journal.start(); }
