Repository: activemq Updated Branches: refs/heads/master 9e856290c -> 03a211ec0
AMQ-6317: Use an SQL Statement for each createSchemaStatement closes #190 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/03a211ec Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/03a211ec Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/03a211ec Branch: refs/heads/master Commit: 03a211ec061d9ed49dc1ac16f171d8f4458483b8 Parents: 9e85629 Author: Jeroen Bastijns <[email protected]> Authored: Thu Jun 9 11:09:18 2016 +0200 Committer: gtully <[email protected]> Committed: Tue Jun 28 15:57:17 2016 +0100 ---------------------------------------------------------------------- activemq-jdbc-store/pom.xml | 26 +++ .../store/jdbc/adapter/DefaultJDBCAdapter.java | 109 +++++------ .../DefaultJDBCAdapterDoCreateTablesTest.java | 181 +++++++++++++++++++ 3 files changed, 265 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/03a211ec/activemq-jdbc-store/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/pom.xml b/activemq-jdbc-store/pom.xml index 9e3ba0f..a22d657 100755 --- a/activemq-jdbc-store/pom.xml +++ b/activemq-jdbc-store/pom.xml @@ -49,6 +49,32 @@ <artifactId>activeio-core</artifactId> <optional>true</optional> </dependency> + + <!-- =============================== --> + <!-- Testing Dependencies --> + <!-- =============================== --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> <reporting> http://git-wip-us.apache.org/repos/asf/activemq/blob/03a211ec/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index facf969..57438bc 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.store.jdbc.adapter; +import static javax.xml.bind.DatatypeConverter.parseBase64Binary; +import static javax.xml.bind.DatatypeConverter.printBase64Binary; + import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; @@ -37,7 +40,6 @@ import org.apache.activemq.command.XATransactionId; import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; -import org.apache.activemq.store.jdbc.JDBCMessageStore; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore; import org.apache.activemq.store.jdbc.Statements; @@ -46,9 +48,6 @@ import org.apache.activemq.util.DataByteArrayOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static javax.xml.bind.DatatypeConverter.parseBase64Binary; -import static javax.xml.bind.DatatypeConverter.printBase64Binary; - /** * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is * encouraged to override the default implementation of methods to account for differences in JDBC Driver @@ -65,6 +64,7 @@ import static javax.xml.bind.DatatypeConverter.printBase64Binary; public class DefaultJDBCAdapter implements JDBCAdapter { private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE; + private static final String FAILURE_MESSAGE = "Failure was: %s Message: %s SQLState: %s Vendor code: %s"; protected Statements statements; private boolean batchStatements = true; //This is deprecated and should be removed in a future release @@ -82,58 +82,68 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } @Override - public void doCreateTables(TransactionContext c) throws SQLException, IOException { - Statement s = null; + public void doCreateTables(TransactionContext transactionContext) throws SQLException, IOException { cleanupExclusiveLock.writeLock().lock(); try { - // Check to see if the table already exists. If it does, then don't - // log warnings during startup. - // Need to run the scripts anyways since they may contain ALTER - // statements that upgrade a previous version - // of the table - boolean alreadyExists = false; - ResultSet rs = null; - try { - rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), - new String[] { "TABLE" }); - alreadyExists = rs.next(); - } catch (Throwable ignore) { - } finally { - close(rs); - } - s = c.getConnection().createStatement(); - String[] createStatments = this.statements.getCreateSchemaStatements(); - for (int i = 0; i < createStatments.length; i++) { + // Check to see if the table already exists. If it does, then don't log warnings during startup. + // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table + boolean messageTableAlreadyExists = messageTableAlreadyExists(transactionContext); + + for (String createStatement : this.statements.getCreateSchemaStatements()) { // This will fail usually since the tables will be // created already. - try { - LOG.debug("Executing SQL: " + createStatments[i]); - s.execute(createStatments[i]); - } catch (SQLException e) { - if (alreadyExists) { - LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: " - + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() - + " Vendor code: " + e.getErrorCode()); - } else { - LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: " - + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() - + " Vendor code: " + e.getErrorCode()); - JDBCPersistenceAdapter.log("Failure details: ", e); - } - } - } - - // if autoCommit used do not call commit - if(!c.getConnection().getAutoCommit()){ - c.getConnection().commit(); + executeStatement(transactionContext, createStatement, messageTableAlreadyExists); } } finally { cleanupExclusiveLock.writeLock().unlock(); - try { - s.close(); - } catch (Throwable e) { + } + } + + private boolean messageTableAlreadyExists(TransactionContext transactionContext) { + boolean alreadyExists = false; + ResultSet rs = null; + try { + rs = transactionContext.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), new String[] { "TABLE" }); + alreadyExists = rs.next(); + } catch (Throwable ignore) { + } finally { + close(rs); + } + return alreadyExists; + } + + private void executeStatement(TransactionContext transactionContext, String createStatement, boolean ignoreStatementExecutionFailure) throws IOException { + Statement statement = null; + try { + LOG.debug("Executing SQL: " + createStatement); + statement = transactionContext.getConnection().createStatement(); + statement.execute(createStatement); + + commitIfAutoCommitIsDisabled(transactionContext); + } catch (SQLException e) { + if (ignoreStatementExecutionFailure) { + LOG.debug("Could not create JDBC tables; The message table already existed. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode())); + } else { + LOG.warn("Could not create JDBC tables; they could already exist. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode())); + JDBCPersistenceAdapter.log("Failure details: ", e); + } + } finally { + closeStatement(statement); + } + } + + private void closeStatement(Statement statement) { + try { + if (statement != null) { + statement.close(); } + } catch (SQLException ignored) {} + } + + private void commitIfAutoCommitIsDisabled(TransactionContext c) throws SQLException, IOException { + if (!c.getConnection().getAutoCommit()) { + c.getConnection().commit(); } } @@ -157,10 +167,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { JDBCPersistenceAdapter.log("Failure details: ", e); } } - // if autoCommit used do not call commit - if(!c.getConnection().getAutoCommit()){ - c.getConnection().commit(); - } + commitIfAutoCommitIsDisabled(c); } finally { cleanupExclusiveLock.writeLock().unlock(); try { http://git-wip-us.apache.org/repos/asf/activemq/blob/03a211ec/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java b/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java new file mode 100644 index 0000000..39dc0e9 --- /dev/null +++ b/activemq-jdbc-store/src/test/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapterDoCreateTablesTest.java @@ -0,0 +1,181 @@ +package org.apache.activemq.store.jdbc.adapter; + +import static org.apache.log4j.Level.DEBUG; +import static org.apache.log4j.Level.WARN; +import static org.junit.Assert.assertEquals; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; + +import org.apache.activemq.store.jdbc.Statements; +import org.apache.activemq.store.jdbc.TransactionContext; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultJDBCAdapterDoCreateTablesTest { + + private static final String CREATE_STATEMENT1 = "createStatement1"; + private static final String CREATE_STATEMENT2 = "createStatement2"; + private static final String[] CREATE_STATEMENTS = new String[] { CREATE_STATEMENT1, CREATE_STATEMENT2 }; + private static final int VENDOR_CODE = 1; + private static final String SQL_STATE = "SqlState"; + private static final String MY_REASON = "MyReason"; + + private DefaultJDBCAdapter defaultJDBCAdapter; + + private List<LoggingEvent> loggingEvents = new ArrayList<>(); + + @Mock + private ReadWriteLock readWriteLock; + + @Mock + private Lock lock; + + @Mock + private TransactionContext transactionContext; + + @Mock(answer = RETURNS_DEEP_STUBS) + private Connection connection; + + @Mock + private Statements statements; + + @Mock + private ResultSet resultSet; + + @Mock + private Statement statement1, statement2; + + @Before + public void setUp() throws IOException, SQLException { + DefaultTestAppender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + loggingEvents.add(event); + } + }; + Logger rootLogger = Logger.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + rootLogger.addAppender(appender); + + + defaultJDBCAdapter = new DefaultJDBCAdapter(); + defaultJDBCAdapter.cleanupExclusiveLock = readWriteLock; + defaultJDBCAdapter.statements = statements; + + when(statements.getCreateSchemaStatements()).thenReturn(CREATE_STATEMENTS); + when(transactionContext.getConnection()).thenReturn(connection); + when(connection.getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),new String[] { "TABLE" })).thenReturn(resultSet); + when(connection.createStatement()).thenReturn(statement1, statement2); + when(connection.getAutoCommit()).thenReturn(true); + when(readWriteLock.writeLock()).thenReturn(lock); + } + + @After + public void tearDown() { + loggingEvents = new ArrayList<>(); + } + + @Test + public void createsTheTablesWhenNoMessageTableExistsAndLogsSqlExceptionsInWarnLevel() throws IOException, SQLException { + when(resultSet.next()).thenReturn(false); + when(statement2.execute(CREATE_STATEMENT2)).thenThrow(new SQLException(MY_REASON, SQL_STATE, VENDOR_CODE)); + + defaultJDBCAdapter.doCreateTables(transactionContext); + + InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2); + inOrder.verify(lock).lock(); + inOrder.verify(resultSet).next(); + inOrder.verify(resultSet).close(); + inOrder.verify(connection).createStatement(); + inOrder.verify(statement1).execute(CREATE_STATEMENT1); + inOrder.verify(statement1).close(); + inOrder.verify(connection).createStatement(); + inOrder.verify(statement2).execute(CREATE_STATEMENT2); + inOrder.verify(statement2).close(); + inOrder.verify(lock).unlock(); + + assertEquals(4, loggingEvents.size()); + assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1); + assertLog(1, DEBUG, "Executing SQL: " + CREATE_STATEMENT2); + assertLog(2, WARN, "Could not create JDBC tables; they could already exist. Failure was: " + CREATE_STATEMENT2 + " Message: " + MY_REASON + " SQLState: " + SQL_STATE + " Vendor code: " + VENDOR_CODE); + assertLog(3, WARN, "Failure details: " + MY_REASON); + } + + @Test + public void triesTocreateTheTablesWhenMessageTableExistsAndLogsSqlExceptionsInDebugLevel() throws SQLException, IOException { + when(resultSet.next()).thenReturn(true); + when(statement1.execute(CREATE_STATEMENT1)).thenThrow(new SQLException(MY_REASON, SQL_STATE, VENDOR_CODE)); + + defaultJDBCAdapter.doCreateTables(transactionContext); + + InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2); + inOrder.verify(lock).lock(); + inOrder.verify(resultSet).next(); + inOrder.verify(resultSet).close(); + inOrder.verify(connection).createStatement(); + inOrder.verify(statement1).execute(CREATE_STATEMENT1); + inOrder.verify(statement1).close(); + inOrder.verify(connection).createStatement(); + inOrder.verify(statement2).execute(CREATE_STATEMENT2); + inOrder.verify(statement2).close(); + inOrder.verify(lock).unlock(); + + assertEquals(3, loggingEvents.size()); + assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1); + assertLog(1, DEBUG, "Could not create JDBC tables; The message table already existed. Failure was: " + CREATE_STATEMENT1 + " Message: " + MY_REASON + " SQLState: " + SQL_STATE + " Vendor code: " + VENDOR_CODE); + assertLog(2, DEBUG, "Executing SQL: " + CREATE_STATEMENT2); + } + + @Test + public void commitsTheTransactionWhenAutoCommitIsDisabled() throws SQLException, IOException { + when(connection.getAutoCommit()).thenReturn(false); + when(resultSet.next()).thenReturn(false); + + defaultJDBCAdapter.doCreateTables(transactionContext); + + InOrder inOrder = inOrder(lock, resultSet, connection, statement1, statement2); + inOrder.verify(lock).lock(); + inOrder.verify(resultSet).next(); + inOrder.verify(resultSet).close(); + inOrder.verify(connection).createStatement(); + inOrder.verify(statement1).execute(CREATE_STATEMENT1); + inOrder.verify(connection).commit(); + inOrder.verify(statement1).close(); + inOrder.verify(connection).createStatement(); + inOrder.verify(statement2).execute(CREATE_STATEMENT2); + inOrder.verify(connection).commit(); + inOrder.verify(statement2).close(); + inOrder.verify(lock).unlock(); + + assertEquals(2, loggingEvents.size()); + assertLog(0, DEBUG, "Executing SQL: " + CREATE_STATEMENT1); + assertLog(1, DEBUG, "Executing SQL: " + CREATE_STATEMENT2); + } + + private void assertLog(int messageNumber, Level level, String message) { + LoggingEvent loggingEvent = loggingEvents.get(messageNumber); + assertEquals(level, loggingEvent.getLevel()); + assertEquals(message, loggingEvent.getMessage()); + } +} \ No newline at end of file
