Repository: activemq-artemis Updated Branches: refs/heads/1.x 8a3f4ccd6 -> a03db5baf
ARTEMIS-1125 Persist JMS Bindings in Database on JDBC Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9c013e74 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9c013e74 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9c013e74 Branch: refs/heads/1.x Commit: 9c013e74cba7ee4b3bc766029ff43c3b306ba4d5 Parents: 8a3f4cc Author: Martyn Taylor <[email protected]> Authored: Thu Apr 20 18:31:24 2017 +0100 Committer: Martyn Taylor <[email protected]> Committed: Thu Apr 20 21:04:26 2017 +0100 ---------------------------------------------------------------------- .../config/ActiveMQDefaultConfiguration.java | 10 ++++- .../journal/JMSJournalStorageManagerImpl.java | 42 ++++++++++++++++---- .../jms/server/impl/JMSServerManagerImpl.java | 6 +-- .../storage/DatabaseStorageConfiguration.java | 10 +++++ .../deployers/impl/FileConfigurationParser.java | 1 + .../artemis/core/server/ActiveMQServer.java | 3 ++ .../core/server/impl/ActiveMQServerImpl.java | 5 +++ .../resources/schema/artemis-configuration.xsd | 7 ++++ .../artemis/tests/util/ActiveMQTestBase.java | 2 + docs/user-manual/en/persistence.md | 8 ++++ .../persistence/StorageManagerTestBase.java | 2 +- 11 files changed, 83 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 5d86243..48b46a9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -428,11 +428,15 @@ public final class ActiveMQDefaultConfiguration { // Default large messages table name, used with Database storage type private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES"; - // Default large messages table name, used with Database storage type + // Default Page Store table name, used with Database storage type private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE"; + private static final int DEFAULT_JDBC_NETWORK_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20); + // Default JMS Bingings table name, used with Database storage type + private static final String DEFAULT_JMS_BINDINGS_TABLE_NAME = "JMS_BINDINGS"; + // Default period to wait between connection TTL checks public static final long DEFAULT_CONNECTION_TTL_CHECK_INTERVAL = 2000; @@ -1180,6 +1184,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_JDBC_NETWORK_TIMEOUT; } + public static String getDefaultJMSBindingsTableName() { + return DEFAULT_JMS_BINDINGS_TABLE_NAME; + } + public static long getDefaultConnectionTtlCheckInterval() { return DEFAULT_CONNECTION_TTL_CHECK_INTERVAL; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java index bc288db..9cd3976 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java @@ -22,11 +22,15 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; @@ -36,6 +40,10 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.replication.ReplicatedJournal; import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; +import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jms.persistence.JMSStorageManager; import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings; import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory; @@ -75,10 +83,13 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager { // Static -------------------------------------------------------- // Constructors -------------------------------------------------- + public JMSJournalStorageManagerImpl(ExecutorFactory ioExecutors, final IDGenerator idGenerator, final Configuration config, - final ReplicationManager replicator) { + final ReplicationManager replicator, + final ScheduledExecutorService scheduledExecutorService, + final IOCriticalErrorListener criticalErrorListener) { final EnumSet<JournalType> supportedJournalTypes = EnumSet.allOf(JournalType.class); if (!supportedJournalTypes.contains(config.getJournalType())) { throw new IllegalArgumentException("Only " + supportedJournalTypes + " are supported Journal types"); @@ -88,14 +99,29 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager { createDir = config.isCreateBindingsDir(); - SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1); - - Journal localJMS = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1, 0); - - if (replicator != null) { - jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator); - } else { + Journal localJMS; + if (config.getStoreConfiguration() != null && config.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { + DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); + if (dbConf.getDataSource() != null) { + SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory(); + if (sqlProviderFactory == null) { + sqlProviderFactory = new GenericSQLProvider.Factory(); + } + localJMS = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getJMSBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, ioExecutors.getExecutor(), criticalErrorListener); + } else { + String driverClassName = dbConf.getJdbcDriverClassName(); + localJMS = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getJMSBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, ioExecutors.getExecutor(), criticalErrorListener); + } jmsJournal = localJMS; + } else { + SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1); + localJMS = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1, 0); + + if (replicator != null) { + jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator); + } else { + jmsJournal = localJMS; + } } this.idGenerator = idGenerator; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java index b5bc701..9ce3299 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java @@ -1523,7 +1523,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback private void initJournal() throws Exception { this.coreConfig = server.getConfiguration(); - createJournal(); + createJournal(server); storage.load(); @@ -1547,12 +1547,12 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback /** * @throws Exception */ - private void createJournal() throws Exception { + private void createJournal(ActiveMQServer activeMQserver) throws Exception { if (storage != null) { storage.stop(); } if (coreConfig.isPersistenceEnabled()) { - storage = new JMSJournalStorageManagerImpl(server.getIOExecutorFactory(), new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager()); + storage = new JMSJournalStorageManagerImpl(server.getIOExecutorFactory(), new TimeAndCounterIDGenerator(), server.getConfiguration(), server.getReplicationManager(), server.getScheduledPool(), activeMQserver.getCriticalIOErrorListener()); } else { storage = new NullJMSStorageManagerImpl(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java index 76626c0..285d1cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java @@ -32,6 +32,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { private String pageStoreTableName = ActiveMQDefaultConfiguration.getDefaultPageStoreTableName(); + private String jmsBindingsTableName = ActiveMQDefaultConfiguration.getDefaultJMSBindingsTableName(); + private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl(); private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName(); @@ -79,6 +81,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { this.pageStoreTableName = pageStoreTableName; } + public String getJMSBindingsTableName() { + return jmsBindingsTableName; + } + + public void setJMSBindingsTableName(String jmsBindingsTableName) { + this.jmsBindingsTableName = jmsBindingsTableName; + } + public void setJdbcConnectionUrl(String jdbcConnectionUrl) { this.jdbcConnectionUrl = jdbcConnectionUrl; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index c36699e..0b82a90 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1157,6 +1157,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK)); conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); conf.setPageStoreTableName(getString(storeNode, "page-store-table-name", conf.getPageStoreTableName(), Validators.NO_CHECK)); + conf.setJMSBindingsTableName(getString(storeNode, "jms-bindings-table-name", conf.getJMSBindingsTableName(), Validators.NO_CHECK)); conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK)); conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index f8a5d75..8703e83 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -421,4 +422,6 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean addClientConnection(String clientId, boolean unique); void removeClientConnection(String clientId); + + IOCriticalErrorListener getCriticalIOErrorListener(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 31aa178..f25b115 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2493,6 +2493,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { } + @Override + public IOCriticalErrorListener getCriticalIOErrorListener() { + return shutdownOnCriticalIO; + } + private final class ConfigurationFileReloader implements ReloadCallback { @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 6fa5fc4..d0c0b25 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1714,6 +1714,13 @@ </xsd:documentation> </xsd:annotation> </xsd:element> + <xsd:element name="jms-bindings-table-name" type="xsd:string" minOccurs="1" maxOccurs="1"> + <xsd:annotation> + <xsd:documentation> + The table name used to store JMS binding journal entries + </xsd:documentation> + </xsd:annotation> + </xsd:element> </xsd:all> </xsd:complexType> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 7d10245..47c916a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -414,6 +414,7 @@ public abstract class ActiveMQTestBase extends Assert { protected Configuration createDefaultJDBCConfig(boolean isNetty) throws Exception { Configuration configuration = createDefaultConfig(isNetty); setDBStoreType(configuration); + configuration.setPersistenceEnabled(true); return configuration; } @@ -462,6 +463,7 @@ public abstract class ActiveMQTestBase extends Assert { dbStorageConfiguration.setMessageTableName("MESSAGE"); dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE"); dbStorageConfiguration.setPageStoreTableName("PAGE_STORE"); + dbStorageConfiguration.setJMSBindingsTableName("JMS_BINDINGS"); dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName()); configuration.setStoreConfiguration(dbStorageConfiguration); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/docs/user-manual/en/persistence.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md index 7e07773..8a422ed 100644 --- a/docs/user-manual/en/persistence.md +++ b/docs/user-manual/en/persistence.md @@ -420,6 +420,14 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a The name of the table in which messages and related data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference. +- `page-store-table-name` + + The name of the table in which paged messages are stored. Specifying table names allows users to share single database amongst multiple servers, without interference. + +- `jms-bindings-table-name` + + The name of the table in which JMS bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference. + - `jdbc-driver-class-name` The fully qualified class name of the desired database Driver. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c013e74/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java index a104363..9504417 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java @@ -155,7 +155,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { * @throws Exception */ protected void createJMSStorage() throws Exception { - jmsJournal = new JMSJournalStorageManagerImpl(null, new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null); + jmsJournal = new JMSJournalStorageManagerImpl(null, new TimeAndCounterIDGenerator(), createDefaultInVMConfig(), null, scheduledExecutorService, null); addActiveMQComponent(jmsJournal); jmsJournal.start(); jmsJournal.load();
