ARTEMIS-1541 Make the JDBC Node Manager more resilient on failures

(cherry picked from commit 70b21725edae28b591b87bb4de0f51364e9cfd50)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f005da6d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f005da6d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f005da6d

Branch: refs/heads/1.x
Commit: f005da6dfdf4d51b244e360abdf1906f7d45670d
Parents: a0f3da5
Author: Francesco Nigro <[email protected]>
Authored: Wed Dec 6 11:28:59 2017 +0100
Committer: Clebert Suconic <[email protected]>
Committed: Wed Mar 28 11:54:15 2018 -0400

----------------------------------------------------------------------
 .../jdbc/store/sql/GenericSQLProvider.java      |  9 ++
 .../artemis/jdbc/store/sql/SQLProvider.java     |  2 +
 .../impl/jdbc/ActiveMQScheduledLeaseLock.java   |  9 +-
 .../core/server/impl/jdbc/JdbcNodeManager.java  | 26 ++++--
 .../impl/jdbc/JdbcSharedStateManager.java       | 87 +++++++++++++++-----
 .../server/impl/jdbc/SharedStateManager.java    |  1 +
 6 files changed, 104 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f005da6d/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
----------------------------------------------------------------------
diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
index ac793d3..c15ce18 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java
@@ -99,6 +99,8 @@ public class GenericSQLProvider implements SQLProvider {
 
    private final String writeNodeIdSQL;
 
+   private final String initializeNodeIdSQL;
+
    private final String readNodeIdSQL;
 
    protected final DatabaseStoreType databaseStoreType;
@@ -176,6 +178,8 @@ public class GenericSQLProvider implements SQLProvider {
 
       writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " 
+ NODE_ID_ROW_ID;
 
+      initializeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE 
NODE_ID IS NULL AND ID = " + NODE_ID_ROW_ID;
+
       readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + 
NODE_ID_ROW_ID;
 
    }
@@ -368,6 +372,11 @@ public class GenericSQLProvider implements SQLProvider {
    }
 
    @Override
+   public String initializeNodeIdSQL() {
+      return initializeNodeIdSQL;
+   }
+
+   @Override
    public boolean closeConnectionOnShutdown() {
       return true;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f005da6d/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
----------------------------------------------------------------------
diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
index b4b55d5..66af24b 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java
@@ -96,6 +96,8 @@ public interface SQLProvider {
 
    String writeNodeIdSQL();
 
+   String initializeNodeIdSQL();
+
    String readNodeIdSQL();
 
    interface Factory {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f005da6d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
index 4a281a2..c5cda70 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java
@@ -87,8 +87,13 @@ final class ActiveMQScheduledLeaseLock extends 
ActiveMQScheduledComponent implem
    public void run() {
       final long lastRenewStart = this.lastLockRenewStart;
       final long renewStart = System.nanoTime();
-      if (!this.lock.renew()) {
-         ioCriticalErrorListener.onIOException(new 
IllegalStateException(lockName + " lock can't be renewed"), "Critical error 
while on " + lockName + " renew", null);
+      try {
+         if (!this.lock.renew()) {
+            ioCriticalErrorListener.onIOException(new 
IllegalStateException(lockName + " lock can't be renewed"), "Critical error 
while on " + lockName + " renew", null);
+         }
+      } catch (Throwable t) {
+         ioCriticalErrorListener.onIOException(t, "Critical error while on " + 
lockName + " renew", null);
+         throw t;
       }
       //logic to detect slowness of DB and/or the scheduled executor service
       detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, 
renewPeriodMillis, lock.expirationMillis());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f005da6d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
index 8cb852d..7bda51e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java
@@ -111,16 +111,24 @@ public final class JdbcNodeManager extends NodeManager {
    }
 
    @Override
-   public synchronized void start() throws Exception {
-      if (isStarted()) {
-         return;
-      }
-      if (!replicatedBackup) {
-         final UUID nodeId = 
sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
-         setUUID(nodeId);
+   public void start() throws Exception {
+      try {
+         synchronized (this) {
+            if (isStarted()) {
+               return;
+            }
+            if (!replicatedBackup) {
+               final UUID nodeId = 
sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
+               setUUID(nodeId);
+            }
+            super.start();
+         }
+      } catch (IllegalStateException e) {
+         if (this.ioCriticalErrorListener != null) {
+            this.ioCriticalErrorListener.onIOException(e, "Failed to setup the 
JdbcNodeManager", null);
+         }
+         throw e;
       }
-
-      super.start();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f005da6d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
index dad1abc..f1e0554 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java
@@ -27,6 +27,7 @@ import java.util.function.Supplier;
 import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.utils.UUID;
+import org.jboss.logging.Logger;
 
 /**
  * JDBC implementation of a {@link SharedStateManager}.
@@ -34,12 +35,15 @@ import org.apache.activemq.artemis.utils.UUID;
 @SuppressWarnings("SynchronizeOnNonFinalField")
 final class JdbcSharedStateManager extends AbstractJDBCDriver implements 
SharedStateManager {
 
+   private static final Logger logger = 
Logger.getLogger(JdbcSharedStateManager.class);
+   public static final int MAX_SETUP_ATTEMPTS = 20;
    private final String holderId;
    private final long lockExpirationMillis;
    private JdbcLeaseLock liveLock;
    private JdbcLeaseLock backupLock;
    private PreparedStatement readNodeId;
    private PreparedStatement writeNodeId;
+   private PreparedStatement initializeNodeId;
    private PreparedStatement readState;
    private PreparedStatement writeState;
 
@@ -81,6 +85,9 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver 
implements SharedS
          createTable(sqlProvider.createNodeManagerStoreTableSQL(), 
sqlProvider.createNodeIdSQL(), sqlProvider.createStateSQL(), 
sqlProvider.createLiveLockSQL(), sqlProvider.createBackupLockSQL());
       } catch (SQLException e) {
          //no op: if a table already exists is not a problem in this case, the 
prepareStatements() call will fail right after it if the table is not correctly 
initialized
+         if (logger.isDebugEnabled()) {
+            logger.debug("Error while creating the schema of the JDBC shared 
state manager", e);
+         }
       }
    }
 
@@ -106,6 +113,7 @@ final class JdbcSharedStateManager extends 
AbstractJDBCDriver implements SharedS
       this.backupLock = createBackupLock(this.holderId, this.connection, 
sqlProvider, lockExpirationMillis, 0);
       this.readNodeId = 
connection.prepareStatement(sqlProvider.readNodeIdSQL());
       this.writeNodeId = 
connection.prepareStatement(sqlProvider.writeNodeIdSQL());
+      this.initializeNodeId = 
connection.prepareStatement(sqlProvider.initializeNodeIdSQL());
       this.writeState = 
connection.prepareStatement(sqlProvider.writeStateSQL());
       this.readState = connection.prepareStatement(sqlProvider.readStateSQL());
    }
@@ -176,34 +184,74 @@ final class JdbcSharedStateManager extends 
AbstractJDBCDriver implements SharedS
       }
    }
 
+   private boolean rawInitializeNodeId(UUID nodeId) throws SQLException {
+      final PreparedStatement preparedStatement = this.initializeNodeId;
+      preparedStatement.setString(1, nodeId.toString());
+      final int rows = preparedStatement.executeUpdate();
+      assert rows <= 1;
+      return rows > 0;
+   }
+
    @Override
    public UUID setup(Supplier<? extends UUID> nodeIdFactory) {
-      //uses a single transaction to make everything
+      SQLException lastError = null;
       synchronized (connection) {
-         try {
-            final UUID nodeId;
-            
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-            connection.setAutoCommit(false);
+         final UUID newNodeId = nodeIdFactory.get();
+         for (int attempts = 0; attempts < MAX_SETUP_ATTEMPTS; attempts++) {
+            lastError = null;
             try {
-               UUID readNodeId = rawReadNodeId();
-               if (readNodeId == null) {
-                  nodeId = nodeIdFactory.get();
-                  rawWriteNodeId(nodeId);
-               } else {
-                  nodeId = readNodeId;
+               final UUID nodeId = initializeOrReadNodeId(newNodeId);
+               if (nodeId != null) {
+                  return nodeId;
                }
             } catch (SQLException e) {
-               connection.rollback();
-               connection.setAutoCommit(true);
-               throw e;
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Error while attempting to setup the NodeId", 
e);
+               }
+               lastError = e;
             }
-            connection.commit();
-            connection.setAutoCommit(true);
-            return nodeId;
-         } catch (SQLException e) {
-            throw new IllegalStateException(e);
          }
       }
+      if (lastError != null) {
+         logger.error("Unable to setup a NodeId on the JDBC shared state", 
lastError);
+      } else {
+         logger.error("Unable to setup a NodeId on the JDBC shared state");
+      }
+      throw new IllegalStateException("FAILED TO SETUP the JDBC Shared State 
NodeId");
+   }
+
+   private UUID initializeOrReadNodeId(final UUID newNodeId) throws 
SQLException {
+      final UUID nodeId;
+      connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+      connection.setAutoCommit(false);
+      try {
+         //optimistic try to initialize nodeId
+         if (rawInitializeNodeId(newNodeId)) {
+            nodeId = newNodeId;
+         } else {
+            nodeId = rawReadNodeId();
+         }
+      } catch (SQLException e) {
+         connection.rollback();
+         connection.setAutoCommit(true);
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rollback while trying to update NodeId to " + 
newNodeId, e);
+         }
+         return null;
+      }
+      if (nodeId != null) {
+         connection.commit();
+         connection.setAutoCommit(true);
+         return nodeId;
+      } else {
+         //that means that the rawInitializeNodeId has failed just due to 
contention or the nodeId wasn't committed yet
+         connection.rollback();
+         connection.setAutoCommit(true);
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rollback after failed to update NodeId to " + 
newNodeId + " and haven't found any NodeId");
+         }
+         return null;
+      }
    }
 
    private static State decodeState(String s) {
@@ -286,6 +334,7 @@ final class JdbcSharedStateManager extends 
AbstractJDBCDriver implements SharedS
          synchronized (connection) {
             this.readNodeId.close();
             this.writeNodeId.close();
+            this.initializeNodeId.close();
             this.readState.close();
             this.writeState.close();
             this.liveLock.close();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f005da6d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
index e26879c..0b2d5fb 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
@@ -46,6 +46,7 @@ interface SharedStateManager extends AutoCloseable {
     *
     * @param nodeIdFactory used to create the nodeId if needed
     * @return the newly created NodeId or the old one if already present
+    * @throws IllegalStateException if not able to setup the NodeId properly
     */
    UUID setup(Supplier<? extends UUID> nodeIdFactory);
 

Reply via email to