ARTEMIS-1447 JDBC NodeManager to support JDBC HA Shared Store (cherry picked from commit 7944a25269d939791bfbc2637e3c649a9137ad45)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/565b8175 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/565b8175 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/565b8175 Branch: refs/heads/1.x Commit: 565b817592573b508a08197a6da1c2649a678be8 Parents: b267e8a Author: Francesco Nigro <[email protected]> Authored: Sat Sep 9 18:41:30 2017 +0200 Committer: Clebert Suconic <[email protected]> Committed: Wed Mar 28 11:54:15 2018 -0400 ---------------------------------------------------------------------- .../config/ActiveMQDefaultConfiguration.java | 25 ++ .../jdbc/store/sql/GenericSQLProvider.java | 182 ++++++++- .../artemis/jdbc/store/sql/SQLProvider.java | 39 +- artemis-server/pom.xml | 7 +- .../storage/DatabaseStorageConfiguration.java | 40 ++ .../deployers/impl/FileConfigurationParser.java | 3 + .../artemis/core/server/NodeManager.java | 2 +- .../core/server/impl/ActiveMQServerImpl.java | 7 + .../impl/jdbc/ActiveMQScheduledLeaseLock.java | 115 ++++++ .../core/server/impl/jdbc/JdbcLeaseLock.java | 277 ++++++++++++++ .../core/server/impl/jdbc/JdbcNodeManager.java | 380 +++++++++++++++++++ .../impl/jdbc/JdbcSharedStateManager.java | 302 +++++++++++++++ .../core/server/impl/jdbc/LeaseLock.java | 151 ++++++++ .../server/impl/jdbc/ScheduledLeaseLock.java | 44 +++ .../server/impl/jdbc/SharedStateManager.java | 60 +++ .../resources/schema/artemis-configuration.xsd | 21 + .../server/impl/jdbc/JdbcLeaseLockTest.java | 231 +++++++++++ docs/user-manual/en/persistence.md | 15 + 18 files changed, 1890 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 6188dbb..c9f775e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -437,9 +437,18 @@ public final class ActiveMQDefaultConfiguration { // Default Page Store table name, used with Database storage type private static final String DEFAULT_PAGE_STORE_TABLE_NAME = "PAGE_STORE"; + // Default node manager store table name, used with Database storage type + private static final String DEFAULT_NODE_MANAGER_STORE_TABLE_NAME = "NODE_MANAGER_STORE"; + private static final int DEFAULT_JDBC_NETWORK_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20); + private static final long DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS = TimeUnit.SECONDS.toMillis(4); + + private static final long DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(20); + + private static final long DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60); + // Default JMS Bingings table name, used with Database storage type private static final String DEFAULT_JMS_BINDINGS_TABLE_NAME = "JMS_BINDINGS"; @@ -1197,10 +1206,26 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_PAGE_STORE_TABLE_NAME; } + public static String getDefaultNodeManagerStoreTableName() { + return DEFAULT_NODE_MANAGER_STORE_TABLE_NAME; + } + public static int getDefaultJdbcNetworkTimeout() { return DEFAULT_JDBC_NETWORK_TIMEOUT; } + public static long getDefaultJdbcLockRenewPeriodMillis() { + return DEFAULT_JDBC_LOCK_RENEW_PERIOD_MILLIS; + } + + public static long getDefaultJdbcLockExpirationMillis() { + return DEFAULT_JDBC_LOCK_EXPIRATION_MILLIS; + } + + public static long getDefaultJdbcLockAcquisitionTimeoutMillis() { + return DEFAULT_JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS; + } + public static String getDefaultJMSBindingsTableName() { return DEFAULT_JMS_BINDINGS_TABLE_NAME; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java index 9232001..ac793d3 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java @@ -18,6 +18,14 @@ package org.apache.activemq.artemis.jdbc.store.sql; public class GenericSQLProvider implements SQLProvider { + /** + * The JDBC Node Manager shared state is contained in these 4 rows: each one is used exclusively for a specific purpose. + */ + private static final int STATE_ROW_ID = 0; + private static final int LIVE_LOCK_ROW_ID = 1; + private static final int BACKUP_LOCK_ROW_ID = 2; + private static final int NODE_ID_ROW_ID = 3; + // Default to lowest (MYSQL = 64k) private static final long MAX_BLOB_SIZE = 64512; @@ -57,6 +65,42 @@ public class GenericSQLProvider implements SQLProvider { private final String countJournalRecordsSQL; + private final String createNodeManagerStoreTableSQL; + + private final String createStateSQL; + + private final String createNodeIdSQL; + + private final String createLiveLockSQL; + + private final String createBackupLockSQL; + + private final String tryAcquireLiveLockSQL; + + private final String tryAcquireBackupLockSQL; + + private final String tryReleaseLiveLockSQL; + + private final String tryReleaseBackupLockSQL; + + private final String isLiveLockedSQL; + + private final String isBackupLockedSQL; + + private final String renewLiveLockSQL; + + private final String renewBackupLockSQL; + + private final String currentTimestampSQL; + + private final String writeStateSQL; + + private final String readStateSQL; + + private final String writeNodeIdSQL; + + private final String readNodeIdSQL; + protected final DatabaseStoreType databaseStoreType; protected GenericSQLProvider(String tableName, DatabaseStoreType databaseStoreType) { @@ -64,8 +108,7 @@ public class GenericSQLProvider implements SQLProvider { this.databaseStoreType = databaseStoreType; - createFileTableSQL = "CREATE TABLE " + tableName + - "(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))"; + createFileTableSQL = "CREATE TABLE " + tableName + "(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))"; insertFileSQL = "INSERT INTO " + tableName + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)"; @@ -81,17 +124,13 @@ public class GenericSQLProvider implements SQLProvider { updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?"; - cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " + - "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)"; + cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " + "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)"; copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?"; dropFileTableSQL = "DROP TABLE " + tableName; - createJournalTableSQL = new String[] { - "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))", - "CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)" - }; + createJournalTableSQL = new String[]{"CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))", "CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"}; insertJournalRecordsSQL = "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)"; @@ -102,6 +141,43 @@ public class GenericSQLProvider implements SQLProvider { deleteJournalTxRecordsSQL = "DELETE FROM " + tableName + " WHERE txId=?"; countJournalRecordsSQL = "SELECT COUNT(*) FROM " + tableName; + + createNodeManagerStoreTableSQL = "CREATE TABLE " + tableName + " ( ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))"; + + createStateSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + STATE_ROW_ID + ")"; + + createNodeIdSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + NODE_ID_ROW_ID + ")"; + + createLiveLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + LIVE_LOCK_ROW_ID + ")"; + + createBackupLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + BACKUP_LOCK_ROW_ID + ")"; + + tryAcquireLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + LIVE_LOCK_ROW_ID; + + tryAcquireBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + BACKUP_LOCK_ROW_ID; + + tryReleaseLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID; + + tryReleaseBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID; + + isLiveLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + LIVE_LOCK_ROW_ID; + + isBackupLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + BACKUP_LOCK_ROW_ID; + + renewLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID; + + renewBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID; + + currentTimestampSQL = "SELECT CURRENT_TIMESTAMP FROM " + tableName; + + writeStateSQL = "UPDATE " + tableName + " SET STATE = ? WHERE ID = " + STATE_ROW_ID; + + readStateSQL = "SELECT STATE FROM " + tableName + " WHERE ID = " + STATE_ROW_ID; + + writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID; + + readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID; + } @Override @@ -202,6 +278,96 @@ public class GenericSQLProvider implements SQLProvider { } @Override + public String createNodeManagerStoreTableSQL() { + return createNodeManagerStoreTableSQL; + } + + @Override + public String createStateSQL() { + return createStateSQL; + } + + @Override + public String createNodeIdSQL() { + return createNodeIdSQL; + } + + @Override + public String createLiveLockSQL() { + return createLiveLockSQL; + } + + @Override + public String createBackupLockSQL() { + return createBackupLockSQL; + } + + @Override + public String tryAcquireLiveLockSQL() { + return tryAcquireLiveLockSQL; + } + + @Override + public String tryAcquireBackupLockSQL() { + return tryAcquireBackupLockSQL; + } + + @Override + public String tryReleaseLiveLockSQL() { + return tryReleaseLiveLockSQL; + } + + @Override + public String tryReleaseBackupLockSQL() { + return tryReleaseBackupLockSQL; + } + + @Override + public String isLiveLockedSQL() { + return isLiveLockedSQL; + } + + @Override + public String isBackupLockedSQL() { + return isBackupLockedSQL; + } + + @Override + public String renewLiveLockSQL() { + return renewLiveLockSQL; + } + + @Override + public String renewBackupLockSQL() { + return renewBackupLockSQL; + } + + @Override + public String currentTimestampSQL() { + return currentTimestampSQL; + } + + @Override + public String writeStateSQL() { + return writeStateSQL; + } + + @Override + public String readStateSQL() { + return readStateSQL; + } + + @Override + public String writeNodeIdSQL() { + return writeNodeIdSQL; + } + + @Override + public String readNodeIdSQL() { + return readNodeIdSQL; + } + + @Override public boolean closeConnectionOnShutdown() { return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java index 1663179..b4b55d5 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.jdbc.store.sql; public interface SQLProvider { enum DatabaseStoreType { - PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE + PAGE, MESSAGE_JOURNAL, BINDINGS_JOURNAL, LARGE_MESSAGE, NODE_MANAGER } long getMaxBlobSize(); @@ -62,7 +62,44 @@ public interface SQLProvider { boolean closeConnectionOnShutdown(); + String createNodeManagerStoreTableSQL(); + + String createStateSQL(); + + String createNodeIdSQL(); + + String createLiveLockSQL(); + + String createBackupLockSQL(); + + String tryAcquireLiveLockSQL(); + + String tryAcquireBackupLockSQL(); + + String tryReleaseLiveLockSQL(); + + String tryReleaseBackupLockSQL(); + + String isLiveLockedSQL(); + + String isBackupLockedSQL(); + + String renewLiveLockSQL(); + + String renewBackupLockSQL(); + + String currentTimestampSQL(); + + String writeStateSQL(); + + String readStateSQL(); + + String writeNodeIdSQL(); + + String readNodeIdSQL(); + interface Factory { + SQLProvider create(String tableName, DatabaseStoreType dbStoreType); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml index 767cfaa..1f68af2 100644 --- a/artemis-server/pom.xml +++ b/artemis-server/pom.xml @@ -124,7 +124,12 @@ <scope>test</scope> <type>test-jar</type> </dependency> - + <!-- db test --> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java index 285d1cf..5429f0c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/storage/DatabaseStorageConfiguration.java @@ -34,6 +34,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { private String jmsBindingsTableName = ActiveMQDefaultConfiguration.getDefaultJMSBindingsTableName(); + private String nodeManagerStoreTableName = ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(); + private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl(); private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName(); @@ -44,6 +46,12 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { private int jdbcNetworkTimeout = ActiveMQDefaultConfiguration.getDefaultJdbcNetworkTimeout(); + private long jdbcLockRenewPeriodMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis(); + + private long jdbcLockExpirationMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis(); + + private long jdbcLockAcquisitionTimeoutMillis = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis(); + @Override public StoreType getStoreType() { return StoreType.DATABASE; @@ -77,6 +85,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { return pageStoreTableName; } + public void setNodeManagerStoreTableName(String nodeManagerStoreTableName) { + this.nodeManagerStoreTableName = nodeManagerStoreTableName; + } + + public String getNodeManagerStoreTableName() { + return nodeManagerStoreTableName; + } + public void setPageStoreTableName(String pageStoreTableName) { this.pageStoreTableName = pageStoreTableName; } @@ -145,4 +161,28 @@ public class DatabaseStorageConfiguration implements StoreConfiguration { public void setJdbcNetworkTimeout(int jdbcNetworkTimeout) { this.jdbcNetworkTimeout = jdbcNetworkTimeout; } + + public long getJdbcLockRenewPeriodMillis() { + return jdbcLockRenewPeriodMillis; + } + + public void setJdbcLockRenewPeriodMillis(long jdbcLockRenewPeriodMillis) { + this.jdbcLockRenewPeriodMillis = jdbcLockRenewPeriodMillis; + } + + public long getJdbcLockExpirationMillis() { + return jdbcLockExpirationMillis; + } + + public void setJdbcLockExpirationMillis(long jdbcLockExpirationMillis) { + this.jdbcLockExpirationMillis = jdbcLockExpirationMillis; + } + + public long getJdbcLockAcquisitionTimeoutMillis() { + return jdbcLockAcquisitionTimeoutMillis; + } + + public void setJdbcLockAcquisitionTimeoutMillis(long jdbcLockAcquisitionTimeoutMillis) { + this.jdbcLockAcquisitionTimeoutMillis = jdbcLockAcquisitionTimeoutMillis; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index cc30163..e69c486 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1164,6 +1164,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK)); conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK)); conf.setJdbcNetworkTimeout(getInteger(storeNode, "jdbc-network-timeout", conf.getJdbcNetworkTimeout(), Validators.NO_CHECK)); + conf.setJdbcLockAcquisitionTimeoutMillis(getLong(storeNode, "jdbc-lock-acquisition-timeout", conf.getJdbcLockAcquisitionTimeoutMillis(), Validators.NO_CHECK)); + conf.setJdbcLockRenewPeriodMillis(getLong(storeNode, "jdbc-lock-renew-period", conf.getJdbcLockRenewPeriodMillis(), Validators.NO_CHECK)); + conf.setJdbcLockExpirationMillis(getLong(storeNode, "jdbc-lock-expiration", conf.getJdbcLockExpirationMillis(), Validators.NO_CHECK)); return conf; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java index 28f05b2..e963b22 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java @@ -127,7 +127,7 @@ public abstract class NodeManager implements ActiveMQComponent { isStarted = false; } - public final void stopBackup() throws Exception { + public void stopBackup() throws Exception { if (replicatedBackup && getNodeId() != null) { setUpServerLockFile(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index bb78608..ab47aa7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -138,6 +138,7 @@ import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler; import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler; +import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; import org.apache.activemq.artemis.core.server.reload.ReloadCallback; @@ -448,6 +449,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { NodeManager manager; if (!configuration.isPersistenceEnabled()) { manager = new InVMNodeManager(replicatingBackup); + } else if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { + if (replicatingBackup) { + throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence"); + } + final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration(); + manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory, shutdownOnCriticalIO); } else { manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java new file mode 100644 index 0000000..30db629 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl.jdbc; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.jboss.logging.Logger; + +/** + * Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, IOCriticalErrorListener)}. + */ +final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock { + + private static final Logger LOGGER = Logger.getLogger(ActiveMQScheduledLeaseLock.class); + + private final String lockName; + private final LeaseLock lock; + private long lastLockRenewStart; + private final long renewPeriodMillis; + private final IOCriticalErrorListener ioCriticalErrorListener; + + ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService, + ArtemisExecutor executor, + String lockName, + LeaseLock lock, + long renewPeriodMillis, + IOCriticalErrorListener ioCriticalErrorListener) { + super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false); + if (renewPeriodMillis >= lock.expirationMillis()) { + throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis"); + } + this.lockName = lockName; + this.lock = lock; + this.renewPeriodMillis = renewPeriodMillis; + //already expired start time + this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis()); + this.ioCriticalErrorListener = ioCriticalErrorListener; + } + + @Override + public long renewPeriodMillis() { + return renewPeriodMillis; + } + + @Override + public LeaseLock lock() { + return lock; + } + + @Override + public synchronized void start() { + if (isStarted()) { + return; + } + this.lastLockRenewStart = System.nanoTime(); + super.start(); + } + + @Override + public synchronized void stop() { + if (!isStarted()) { + return; + } + super.stop(); + } + + @Override + public void run() { + final long lastRenewStart = this.lastLockRenewStart; + final long renewStart = System.nanoTime(); + if (!this.lock.renew()) { + ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null); + } + //logic to detect slowness of DB and/or the scheduled executor service + detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis()); + this.lastLockRenewStart = lastRenewStart; + } + + private static void detectAndReportRenewSlowness(String lockName, + long lastRenewStart, + long renewStart, + long expectedRenewPeriodMillis, + long expirationMillis) { + final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart); + if (elapsedMillisToRenew > expectedRenewPeriodMillis) { + LOGGER.error(lockName + " lock renew tooks " + elapsedMillisToRenew + " ms, while is supposed to take <" + expectedRenewPeriodMillis + " ms"); + } + final long measuredRenewPeriodNanos = renewStart - lastRenewStart; + final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos); + if (measuredRenewPeriodMillis > expirationMillis) { + LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms"); + } else if (measuredRenewPeriodMillis > expectedRenewPeriodMillis) { + LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms"); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java new file mode 100644 index 0000000..0656235 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl.jdbc; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Objects; +import java.util.function.Predicate; + +import org.jboss.logging.Logger; + +/** + * JDBC implementation of a {@link LeaseLock} with a {@code String} defined {@link #holderId()}. + */ +final class JdbcLeaseLock implements LeaseLock { + + private static final Logger LOGGER = Logger.getLogger(JdbcLeaseLock.class); + private static final int MAX_HOLDER_ID_LENGTH = 128; + private final Connection connection; + private final long maxAllowableMillisDiffFromDBTime; + private long millisDiffFromCurrentTime; + private final String holderId; + private final PreparedStatement tryAcquireLock; + private final PreparedStatement tryReleaseLock; + private final PreparedStatement renewLock; + private final PreparedStatement isLocked; + private final PreparedStatement currentDateTime; + private final long expirationMillis; + private boolean maybeAcquired; + + /** + * The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection}, + * whose life cycle will be managed externally. + */ + JdbcLeaseLock(String holderId, + Connection connection, + PreparedStatement tryAcquireLock, + PreparedStatement tryReleaseLock, + PreparedStatement renewLock, + PreparedStatement isLocked, + PreparedStatement currentDateTime, + long expirationMIllis, + long maxAllowableMillisDiffFromDBTime) { + if (holderId.length() > MAX_HOLDER_ID_LENGTH) { + throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH); + } + this.holderId = holderId; + this.maxAllowableMillisDiffFromDBTime = maxAllowableMillisDiffFromDBTime; + this.millisDiffFromCurrentTime = Long.MAX_VALUE; + this.tryAcquireLock = tryAcquireLock; + this.tryReleaseLock = tryReleaseLock; + this.renewLock = renewLock; + this.isLocked = isLocked; + this.currentDateTime = currentDateTime; + this.expirationMillis = expirationMIllis; + this.maybeAcquired = false; + this.connection = connection; + } + + public String holderId() { + return holderId; + } + + @Override + public long expirationMillis() { + return expirationMillis; + } + + private long timeDifference() throws SQLException { + if (Long.MAX_VALUE == millisDiffFromCurrentTime) { + if (maxAllowableMillisDiffFromDBTime > 0) { + millisDiffFromCurrentTime = determineTimeDifference(); + } else { + millisDiffFromCurrentTime = 0L; + } + } + return millisDiffFromCurrentTime; + } + + private long determineTimeDifference() throws SQLException { + try (ResultSet resultSet = currentDateTime.executeQuery()) { + long result = 0L; + if (resultSet.next()) { + final Timestamp timestamp = resultSet.getTimestamp(1); + final long diff = System.currentTimeMillis() - timestamp.getTime(); + if (Math.abs(diff) > maxAllowableMillisDiffFromDBTime) { + // off by more than maxAllowableMillisDiffFromDBTime so lets adjust + result = (-diff); + } + LOGGER.info(holderId() + " diff adjust from db: " + result + ", db time: " + timestamp); + } + return result; + } + } + + @Override + public boolean renew() { + synchronized (connection) { + try { + final boolean result; + connection.setAutoCommit(false); + try { + final long timeDifference = timeDifference(); + final PreparedStatement preparedStatement = this.renewLock; + final long now = System.currentTimeMillis() + timeDifference; + final Timestamp timestamp = new Timestamp(now + expirationMillis); + preparedStatement.setTimestamp(1, timestamp); + preparedStatement.setString(2, holderId); + result = preparedStatement.executeUpdate() == 1; + } catch (SQLException ie) { + connection.rollback(); + connection.setAutoCommit(true); + throw new IllegalStateException(ie); + } + connection.commit(); + connection.setAutoCommit(true); + return result; + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public boolean tryAcquire() { + synchronized (connection) { + try { + final boolean acquired; + connection.setAutoCommit(false); + try { + final long timeDifference = timeDifference(); + final PreparedStatement preparedStatement = tryAcquireLock; + final long now = System.currentTimeMillis() + timeDifference; + preparedStatement.setString(1, holderId); + final Timestamp timestamp = new Timestamp(now + expirationMillis); + preparedStatement.setTimestamp(2, timestamp); + acquired = preparedStatement.executeUpdate() == 1; + } catch (SQLException ie) { + connection.rollback(); + connection.setAutoCommit(true); + throw new IllegalStateException(ie); + } + connection.commit(); + connection.setAutoCommit(true); + if (acquired) { + this.maybeAcquired = true; + } + return acquired; + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public boolean isHeld() { + return checkValidHolderId(Objects::nonNull); + } + + @Override + public boolean isHeldByCaller() { + return checkValidHolderId(this.holderId::equals); + } + + private boolean checkValidHolderId(Predicate<? super String> holderIdFilter) { + synchronized (connection) { + try { + boolean result; + connection.setAutoCommit(false); + try { + final long timeDifference = timeDifference(); + final PreparedStatement preparedStatement = this.isLocked; + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (!resultSet.next()) { + result = false; + } else { + final String currentHolderId = resultSet.getString(1); + result = holderIdFilter.test(currentHolderId); + //warn about any zombie lock + final Timestamp timestamp = resultSet.getTimestamp(2); + if (timestamp != null) { + final long lockExpirationTime = timestamp.getTime(); + final long now = System.currentTimeMillis() + timeDifference; + final long expiredBy = now - lockExpirationTime; + if (expiredBy > 0) { + result = false; + LOGGER.warn("found zombie lock with holderId: " + currentHolderId + " expired by: " + expiredBy + " ms"); + } + } + } + } + } catch (SQLException ie) { + connection.rollback(); + connection.setAutoCommit(true); + throw new IllegalStateException(ie); + } + connection.commit(); + connection.setAutoCommit(true); + return result; + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public void release() { + synchronized (connection) { + try { + connection.setAutoCommit(false); + try { + final PreparedStatement preparedStatement = this.tryReleaseLock; + preparedStatement.setString(1, holderId); + if (preparedStatement.executeUpdate() != 1) { + LOGGER.warn(holderId + " has failed to release a lock"); + } else { + LOGGER.info(holderId + " has released a lock"); + } + //consider it as released to avoid on finalize to be reclaimed + this.maybeAcquired = false; + } catch (SQLException ie) { + connection.rollback(); + connection.setAutoCommit(true); + throw new IllegalStateException(ie); + } + connection.commit(); + connection.setAutoCommit(true); + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public void close() throws SQLException { + synchronized (connection) { + //to avoid being called if not needed + if (!this.tryReleaseLock.isClosed()) { + try { + if (this.maybeAcquired) { + release(); + } + } finally { + this.tryReleaseLock.close(); + this.tryAcquireLock.close(); + this.renewLock.close(); + this.isLocked.close(); + this.currentDateTime.close(); + } + } + } + } + + @Override + protected void finalize() throws Throwable { + close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java new file mode 100644 index 0000000..f4baeea --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java @@ -0,0 +1,380 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl.jdbc; + +import javax.sql.DataSource; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.server.ActivateCallback; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.UUID; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.jboss.logging.Logger; + +/** + * JDBC implementation of {@link NodeManager}. + */ +public final class JdbcNodeManager extends NodeManager { + + private static final Logger logger = Logger.getLogger(JdbcNodeManager.class); + private static final long MAX_PAUSE_MILLIS = 2000L; + + private final SharedStateManager sharedStateManager; + private final ScheduledLeaseLock scheduledLiveLock; + private final ScheduledLeaseLock scheduledBackupLock; + private final long lockRenewPeriodMillis; + private final long lockAcquisitionTimeoutMillis; + private volatile boolean interrupted = false; + private final LeaseLock.Pauser pauser; + private final IOCriticalErrorListener ioCriticalErrorListener; + + public static JdbcNodeManager with(DatabaseStorageConfiguration configuration, + ScheduledExecutorService scheduledExecutorService, + ExecutorFactory executorFactory, + IOCriticalErrorListener ioCriticalErrorListener) { + if (configuration.getDataSource() != null) { + final String brokerId = java.util.UUID.randomUUID().toString(); + return usingDataSource(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getDataSource(), configuration.getSqlProviderFactory().create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener); + } else { + final String brokerId = java.util.UUID.randomUUID().toString(); + return usingConnectionUrl(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getJdbcConnectionUrl(), configuration.getJdbcDriverClassName(), configuration.getSqlProviderFactory().create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener); + } + } + + static JdbcNodeManager usingDataSource(String brokerId, + long lockExpirationMillis, + long lockRenewPeriodMillis, + long lockAcquisitionTimeoutMillis, + DataSource dataSource, + SQLProvider provider, + ScheduledExecutorService scheduledExecutorService, + ExecutorFactory executorFactory, + IOCriticalErrorListener ioCriticalErrorListener) { + return new JdbcNodeManager(JdbcSharedStateManager.usingDataSource(brokerId, lockExpirationMillis, dataSource, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener); + } + + public static JdbcNodeManager usingConnectionUrl(String brokerId, + long lockExpirationMillis, + long lockRenewPeriodMillis, + long lockAcquisitionTimeoutMillis, + String jdbcUrl, + String driverClass, + SQLProvider provider, + ScheduledExecutorService scheduledExecutorService, + ExecutorFactory executorFactory, + IOCriticalErrorListener ioCriticalErrorListener) { + return new JdbcNodeManager(JdbcSharedStateManager.usingConnectionUrl(brokerId, lockExpirationMillis, jdbcUrl, driverClass, provider), false, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, executorFactory, ioCriticalErrorListener); + } + + private JdbcNodeManager(final SharedStateManager sharedStateManager, + boolean replicatedBackup, + long lockRenewPeriodMillis, + long lockAcquisitionTimeoutMillis, + ScheduledExecutorService scheduledExecutorService, + ExecutorFactory executorFactory, + IOCriticalErrorListener ioCriticalErrorListener) { + super(replicatedBackup, null); + this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis; + this.lockRenewPeriodMillis = lockRenewPeriodMillis; + this.pauser = LeaseLock.Pauser.sleep(Math.min(this.lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS); + this.sharedStateManager = sharedStateManager; + this.scheduledLiveLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory.getExecutor(), "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, ioCriticalErrorListener); + this.scheduledBackupLock = ScheduledLeaseLock.of(scheduledExecutorService, executorFactory.getExecutor(), "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, ioCriticalErrorListener); + this.ioCriticalErrorListener = ioCriticalErrorListener; + } + + @Override + public synchronized void start() throws Exception { + if (isStarted()) { + return; + } + if (!replicatedBackup) { + final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID); + setUUID(nodeId); + } + + super.start(); + } + + @Override + public synchronized void stop() throws Exception { + if (isStarted()) { + try { + this.scheduledLiveLock.stop(); + this.scheduledBackupLock.stop(); + } finally { + super.stop(); + this.sharedStateManager.close(); + } + } + } + + @Override + protected void finalize() throws Throwable { + stop(); + } + + @Override + public boolean isAwaitingFailback() throws Exception { + return readSharedState() == SharedStateManager.State.FAILING_BACK; + } + + @Override + public boolean isBackupLive() throws Exception { + //is anyone holding the live lock? + return this.scheduledLiveLock.lock().isHeld(); + } + + @Override + public void stopBackup() throws Exception { + if (replicatedBackup) { + final UUID nodeId = getUUID(); + sharedStateManager.writeNodeId(nodeId); + } + releaseBackup(); + } + + @Override + public void interrupt() { + //need to be volatile: must be called concurrently to work as expected + interrupted = true; + } + + @Override + public void releaseBackup() throws Exception { + if (this.scheduledBackupLock.lock().isHeldByCaller()) { + this.scheduledBackupLock.stop(); + this.scheduledBackupLock.lock().release(); + } + } + + /** + * Try to acquire a lock, failing with an exception otherwise. + */ + private void lock(LeaseLock lock) throws Exception { + final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(this.lockAcquisitionTimeoutMillis, this.pauser, () -> !this.interrupted); + switch (acquireResult) { + case Timeout: + throw new Exception("timed out waiting for lock"); + case Exit: + this.interrupted = false; + throw new InterruptedException("LeaseLock was interrupted"); + case Done: + break; + default: + throw new AssertionError(acquireResult + " not managed"); + } + + } + + private void checkInterrupted(Supplier<String> message) throws InterruptedException { + if (this.interrupted) { + interrupted = false; + throw new InterruptedException(message.get()); + } + } + + private void renewLiveLockIfNeeded(final long acquiredOn) { + final long acquiredMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - acquiredOn); + if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) { + if (!this.scheduledLiveLock.lock().renew()) { + final IllegalStateException e = new IllegalStateException("live lock can't be renewed"); + try { + ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null); + } finally { + throw e; + } + } + } + } + + /** + * Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise + */ + private boolean lockLiveAndCheckLiveState() throws Exception { + lock(this.scheduledLiveLock.lock()); + final long acquiredOn = System.nanoTime(); + boolean liveWhileLocked = false; + //check if the state is live + final SharedStateManager.State stateWhileLocked; + try { + stateWhileLocked = readSharedState(); + } catch (Throwable t) { + logger.error("error while holding the live node lock and tried to read the shared state", t); + this.scheduledLiveLock.lock().release(); + throw t; + } + if (stateWhileLocked == SharedStateManager.State.LIVE) { + renewLiveLockIfNeeded(acquiredOn); + liveWhileLocked = true; + } else { + if (logger.isDebugEnabled()) { + logger.debug("state is " + stateWhileLocked + " while holding the live lock"); + } + //state is not live: can (try to) release the lock + this.scheduledLiveLock.lock().release(); + } + return liveWhileLocked; + } + + @Override + public void awaitLiveNode() throws Exception { + boolean liveWhileLocked = false; + while (!liveWhileLocked) { + //check first without holding any lock + final SharedStateManager.State state = readSharedState(); + if (state == SharedStateManager.State.LIVE) { + //verify if the state is live while holding the live node lock too + liveWhileLocked = lockLiveAndCheckLiveState(); + } else { + if (logger.isDebugEnabled()) { + logger.debug("awaiting live node...state: " + state); + } + } + if (!liveWhileLocked) { + checkInterrupted(() -> "awaitLiveNode got interrupted!"); + pauser.idle(); + } + } + //state is LIVE and live lock is acquired and valid + logger.debug("acquired live node lock"); + this.scheduledLiveLock.start(); + } + + @Override + public void startBackup() throws Exception { + assert !replicatedBackup; // should not be called if this is a replicating backup + ActiveMQServerLogger.LOGGER.waitingToBecomeBackup(); + + lock(scheduledBackupLock.lock()); + scheduledBackupLock.start(); + ActiveMQServerLogger.LOGGER.gotBackupLock(); + if (getUUID() == null) + readNodeId(); + } + + @Override + public ActivateCallback startLiveNode() throws Exception { + setFailingBack(); + + final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds"; + + ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage); + + lock(this.scheduledLiveLock.lock()); + + this.scheduledLiveLock.start(); + + ActiveMQServerLogger.LOGGER.obtainedLiveLock(); + + return new ActivateCallback() { + @Override + public void preActivate() { + } + + @Override + public void activated() { + } + + @Override + public void deActivate() { + } + + @Override + public void activationComplete() { + try { + //state can be written only if the live renew task is running + setLive(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } + } + }; + } + + @Override + public void pauseLiveServer() throws Exception { + if (scheduledLiveLock.isStarted()) { + setPaused(); + scheduledLiveLock.stop(); + scheduledLiveLock.lock().release(); + } else if (scheduledLiveLock.lock().renew()) { + setPaused(); + scheduledLiveLock.lock().release(); + } else { + final IllegalStateException e = new IllegalStateException("live lock can't be renewed"); + try { + ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null); + } finally { + throw e; + } + } + } + + @Override + public void crashLiveServer() throws Exception { + if (this.scheduledLiveLock.lock().isHeldByCaller()) { + scheduledLiveLock.stop(); + this.scheduledLiveLock.lock().release(); + } + } + + @Override + public void awaitLiveStatus() { + while (readSharedState() != SharedStateManager.State.LIVE) { + pauser.idle(); + } + } + + private void setLive() { + writeSharedState(SharedStateManager.State.LIVE); + } + + private void setFailingBack() { + writeSharedState(SharedStateManager.State.FAILING_BACK); + } + + private void setPaused() { + writeSharedState(SharedStateManager.State.PAUSED); + } + + private void writeSharedState(SharedStateManager.State state) { + assert !this.replicatedBackup : "the replicated backup can't write the shared state!"; + this.sharedStateManager.writeState(state); + } + + private SharedStateManager.State readSharedState() { + return this.sharedStateManager.readState(); + } + + @Override + public SimpleString readNodeId() { + final UUID nodeId = this.sharedStateManager.readNodeId(); + setUUID(nodeId); + return getNodeId(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java new file mode 100644 index 0000000..dad1abc --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl.jdbc; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.function.Supplier; + +import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.apache.activemq.artemis.utils.UUID; + +/** + * JDBC implementation of a {@link SharedStateManager}. + */ +@SuppressWarnings("SynchronizeOnNonFinalField") +final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedStateManager { + + private final String holderId; + private final long lockExpirationMillis; + private JdbcLeaseLock liveLock; + private JdbcLeaseLock backupLock; + private PreparedStatement readNodeId; + private PreparedStatement writeNodeId; + private PreparedStatement readState; + private PreparedStatement writeState; + + public static JdbcSharedStateManager usingDataSource(String holderId, + long locksExpirationMillis, + DataSource dataSource, + SQLProvider provider) { + final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis); + sharedStateManager.setDataSource(dataSource); + sharedStateManager.setSqlProvider(provider); + try { + sharedStateManager.start(); + return sharedStateManager; + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + + public static JdbcSharedStateManager usingConnectionUrl(String holderId, + long locksExpirationMillis, + String jdbcConnectionUrl, + String jdbcDriverClass, + SQLProvider provider) { + final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis); + sharedStateManager.setJdbcConnectionUrl(jdbcConnectionUrl); + sharedStateManager.setJdbcDriverClass(jdbcDriverClass); + sharedStateManager.setSqlProvider(provider); + try { + sharedStateManager.start(); + return sharedStateManager; + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + + @Override + protected void createSchema() throws SQLException { + try { + createTable(sqlProvider.createNodeManagerStoreTableSQL(), sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL()); + } catch (SQLException e) { + //no op: if a table already exists is not a problem in this case, the prepareStatements() call will fail right after it if the table is not correctly initialized + } + } + + static JdbcLeaseLock createLiveLock(String holderId, + Connection connection, + SQLProvider sqlProvider, + long expirationMillis, + long maxAllowableMillisDiffFromDBtime) throws SQLException { + return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireLiveLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseLiveLockSQL()), connection.prepareStatement(sqlProvider.renewLiveLockSQL()), connection.prepareStatement(sqlProvider.isLiveLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime); + } + + static JdbcLeaseLock createBackupLock(String holderId, + Connection connection, + SQLProvider sqlProvider, + long expirationMillis, + long maxAllowableMillisDiffFromDBtime) throws SQLException { + return new JdbcLeaseLock(holderId, connection, connection.prepareStatement(sqlProvider.tryAcquireBackupLockSQL()), connection.prepareStatement(sqlProvider.tryReleaseBackupLockSQL()), connection.prepareStatement(sqlProvider.renewBackupLockSQL()), connection.prepareStatement(sqlProvider.isBackupLockedSQL()), connection.prepareStatement(sqlProvider.currentTimestampSQL()), expirationMillis, maxAllowableMillisDiffFromDBtime); + } + + @Override + protected void prepareStatements() throws SQLException { + this.liveLock = createLiveLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0); + this.backupLock = createBackupLock(this.holderId, this.connection, sqlProvider, lockExpirationMillis, 0); + this.readNodeId = connection.prepareStatement(sqlProvider.readNodeIdSQL()); + this.writeNodeId = connection.prepareStatement(sqlProvider.writeNodeIdSQL()); + this.writeState = connection.prepareStatement(sqlProvider.writeStateSQL()); + this.readState = connection.prepareStatement(sqlProvider.readStateSQL()); + } + + private JdbcSharedStateManager(String holderId, long lockExpirationMillis) { + this.holderId = holderId; + this.lockExpirationMillis = lockExpirationMillis; + } + + @Override + public LeaseLock liveLock() { + return this.liveLock; + } + + @Override + public LeaseLock backupLock() { + return this.backupLock; + } + + private UUID rawReadNodeId() throws SQLException { + final PreparedStatement preparedStatement = this.readNodeId; + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (!resultSet.next()) { + return null; + } else { + final String nodeId = resultSet.getString(1); + if (nodeId != null) { + return new UUID(UUID.TYPE_TIME_BASED, UUID.stringToBytes(nodeId)); + } else { + return null; + } + } + } + } + + @Override + public UUID readNodeId() { + synchronized (connection) { + try { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + connection.setAutoCommit(true); + final UUID nodeId = rawReadNodeId(); + return nodeId; + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public void writeNodeId(UUID nodeId) { + synchronized (connection) { + try { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + connection.setAutoCommit(true); + rawWriteNodeId(nodeId); + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + } + + private void rawWriteNodeId(UUID nodeId) throws SQLException { + final PreparedStatement preparedStatement = this.writeNodeId; + preparedStatement.setString(1, nodeId.toString()); + if (preparedStatement.executeUpdate() != 1) { + throw new IllegalStateException("can't write NODE_ID on the JDBC Node Manager Store!"); + } + } + + @Override + public UUID setup(Supplier<? extends UUID> nodeIdFactory) { + //uses a single transaction to make everything + synchronized (connection) { + try { + final UUID nodeId; + connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); + connection.setAutoCommit(false); + try { + UUID readNodeId = rawReadNodeId(); + if (readNodeId == null) { + nodeId = nodeIdFactory.get(); + rawWriteNodeId(nodeId); + } else { + nodeId = readNodeId; + } + } catch (SQLException e) { + connection.rollback(); + connection.setAutoCommit(true); + throw e; + } + connection.commit(); + connection.setAutoCommit(true); + return nodeId; + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + } + + private static State decodeState(String s) { + if (s == null) { + return State.NOT_STARTED; + } + switch (s) { + case "L": + return State.LIVE; + case "F": + return State.FAILING_BACK; + case "P": + return State.PAUSED; + case "N": + return State.NOT_STARTED; + default: + throw new IllegalStateException("unknown state [" + s + "]"); + } + } + + private static String encodeState(State state) { + switch (state) { + case LIVE: + return "L"; + case FAILING_BACK: + return "F"; + case PAUSED: + return "P"; + case NOT_STARTED: + return "N"; + default: + throw new IllegalStateException("unknown state [" + state + "]"); + } + } + + @Override + public State readState() { + synchronized (connection) { + try { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + connection.setAutoCommit(true); + final State state; + final PreparedStatement preparedStatement = this.readState; + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (!resultSet.next()) { + state = State.FIRST_TIME_START; + } else { + state = decodeState(resultSet.getString(1)); + } + } + return state; + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public void writeState(State state) { + final String encodedState = encodeState(state); + synchronized (connection) { + try { + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + connection.setAutoCommit(true); + final PreparedStatement preparedStatement = this.writeState; + preparedStatement.setString(1, encodedState); + if (preparedStatement.executeUpdate() != 1) { + throw new IllegalStateException("can't write STATE to the JDBC Node Manager Store!"); + } + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public void stop() throws SQLException { + //release all the managed resources inside the connection lock + if (sqlProvider.closeConnectionOnShutdown()) { + synchronized (connection) { + this.readNodeId.close(); + this.writeNodeId.close(); + this.readState.close(); + this.writeState.close(); + this.liveLock.close(); + this.backupLock.close(); + super.stop(); + } + } + } + + @Override + public void close() throws SQLException { + stop(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java new file mode 100644 index 0000000..8deda12 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl.jdbc; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +/** + * It represents a lock that can't be held more than {@link #expirationMillis()} without being renewed. + * + * <p> + * An implementor must provide implicitly the caller identity to contextualize each operation (eg {@link JdbcLeaseLock} + * uses one caller per instance) + */ +interface LeaseLock extends AutoCloseable { + + enum AcquireResult { + Timeout, Exit, Done + } + + interface ExitCondition { + + /** + * @return true as long as we should keep running + */ + boolean keepRunning(); + } + + interface Pauser { + + void idle(); + + static Pauser sleep(long idleTime, TimeUnit timeUnit) { + final long idleNanos = timeUnit.toNanos(idleTime); + //can fail spuriously but doesn't throw any InterruptedException + return () -> LockSupport.parkNanos(idleNanos); + } + + static Pauser noWait() { + return () -> { + }; + } + } + + /** + * The expiration in milliseconds from the last valid acquisition/renew. + */ + default long expirationMillis() { + return Long.MAX_VALUE; + } + + /** + * It extends the lock expiration (if held) to {@link System#currentTimeMillis()} + {@link #expirationMillis()}. + * + * @return {@code true} if the expiration has been moved on, {@code false} otherwise + */ + default boolean renew() { + return true; + } + + /** + * Not reentrant lock acquisition operation. + * The lock can be acquired if is not held by anyone (including the caller) or has an expired ownership. + * + * @return {@code true} if has been acquired, {@code false} otherwise + */ + boolean tryAcquire(); + + /** + * Not reentrant lock acquisition operation (ie {@link #tryAcquire()}). + * It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done})or got interrupted (ie {@link AcquireResult#Exit}). + * After each failed attempt is performed a {@link Pauser#idle} call. + */ + default AcquireResult tryAcquire(ExitCondition exitCondition, Pauser pauser) { + while (exitCondition.keepRunning()) { + if (tryAcquire()) { + return AcquireResult.Done; + } else { + pauser.idle(); + } + } + return AcquireResult.Exit; + } + + /** + * Not reentrant lock acquisition operation (ie {@link #tryAcquire()}). + * It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done}), got interrupted (ie {@link AcquireResult#Exit}) + * or exceed {@code tryAcquireTimeoutMillis}. + * After each failed attempt is performed a {@link Pauser#idle} call. + * If the specified timeout is <=0 then it behaves as {@link #tryAcquire(ExitCondition, Pauser)}. + */ + default AcquireResult tryAcquire(long tryAcquireTimeoutMillis, Pauser pauser, ExitCondition exitCondition) { + if (tryAcquireTimeoutMillis < 0) { + return tryAcquire(exitCondition, pauser); + } + final long timeoutInNanosecond = TimeUnit.MILLISECONDS.toNanos(tryAcquireTimeoutMillis); + final long startAcquire = System.nanoTime(); + while (exitCondition.keepRunning()) { + if (tryAcquire()) { + return AcquireResult.Done; + } else if (System.nanoTime() - startAcquire >= timeoutInNanosecond) { + return AcquireResult.Timeout; + } else { + pauser.idle(); + //check before doing anything if time is expired + if (System.nanoTime() - startAcquire >= timeoutInNanosecond) { + return AcquireResult.Timeout; + } + } + } + return AcquireResult.Exit; + } + + /** + * @return {@code true} if there is a valid (ie not expired) owner, {@code false} otherwise + */ + boolean isHeld(); + + /** + * @return {@code true} if the caller is a valid (ie not expired) owner, {@code false} otherwise + */ + boolean isHeldByCaller(); + + /** + * It release the lock itself and all the resources used by it (eg connections, file handlers) + */ + @Override + default void close() throws Exception { + release(); + } + + /** + * Perform the release if this lock is held by the caller. + */ + void release(); +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java new file mode 100644 index 0000000..43751f8 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl.jdbc; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; + +/** + * {@link LeaseLock} holder that allows to schedule a {@link LeaseLock#renew} task with a fixed {@link #renewPeriodMillis()} delay. + */ +interface ScheduledLeaseLock extends ActiveMQComponent { + + LeaseLock lock(); + + long renewPeriodMillis(); + + static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService, + ArtemisExecutor executor, + String lockName, + LeaseLock lock, + long renewPeriodMillis, + IOCriticalErrorListener ioCriticalErrorListener) { + return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java new file mode 100644 index 0000000..e26879c --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl.jdbc; + +import java.util.function.Supplier; + +import org.apache.activemq.artemis.utils.UUID; + +/** + * Facade to abstract the operations on the shared state (inter-process and/or inter-thread) necessary to coordinate broker nodes. + */ +interface SharedStateManager extends AutoCloseable { + + enum State { + LIVE, PAUSED, FAILING_BACK, NOT_STARTED, FIRST_TIME_START + } + + LeaseLock liveLock(); + + LeaseLock backupLock(); + + UUID readNodeId(); + + void writeNodeId(UUID nodeId); + + /** + * Purpose of this method is to setup the environment to provide a shared state between live/backup servers. + * That means: + * - check if a shared state exist and create it/wait for it if not + * - check if a nodeId exists and create it if not + * + * @param nodeIdFactory used to create the nodeId if needed + * @return the newly created NodeId or the old one if already present + */ + UUID setup(Supplier<? extends UUID> nodeIdFactory); + + State readState(); + + void writeState(State state); + + @Override + default void close() throws Exception { + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index b00298a..81f13bf 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1722,6 +1722,27 @@ </xsd:documentation> </xsd:annotation> </xsd:element> + <xsd:element name="jdbc-lock-acquisition-timeout" type="xsd:int" minOccurs="0" maxOccurs="1"> + <xsd:annotation> + <xsd:documentation> + The max allowed time in milliseconds while trying to acquire a JDBC lock. + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="jdbc-lock-renew-period" type="xsd:int" minOccurs="0" maxOccurs="1"> + <xsd:annotation> + <xsd:documentation> + The period in milliseconds of the keep alive service of a JDBC lock. + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="jdbc-lock-expiration" type="xsd:int" minOccurs="0" maxOccurs="1"> + <xsd:annotation> + <xsd:documentation> + The time in milliseconds a JDBC lock is considered valid without keeping it alive. + </xsd:documentation> + </xsd:annotation> + </xsd:element> <xsd:element name="jms-bindings-table-name" type="xsd:string" minOccurs="1" maxOccurs="1"> <xsd:annotation> <xsd:documentation>
