This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new e9caa75 [AMQ-7473] Add stopOnError configuration to stop the broker
when locker has an exception
new 0fd8e52 Merge pull request #525 from jbonofre/AMQ-7473
e9caa75 is described below
commit e9caa75b1a600ea61f98b1f71d16c0443e2f5b41
Author: jbonofre <[email protected]>
AuthorDate: Mon May 4 11:08:50 2020 +0200
[AMQ-7473] Add stopOnError configuration to stop the broker when locker has
an exception
---
.../org/apache/activemq/broker/AbstractLocker.java | 1 +
.../java/org/apache/activemq/broker/Lockable.java | 7 ++
.../activemq/broker/LockableServiceSupport.java | 16 +++++
.../store/jdbc/JDBCPersistenceAdapter.java | 30 +++++++-
.../activemq/store/jdbc/TransactionContext.java | 82 ++++++++++++++++++----
.../java/org/apache/activemq/bugs/AMQ4636Test.java | 2 +-
.../activemq/bugs/TrapMessageInJDBCStoreTest.java | 2 +-
.../jdbc/JmsTransactionCommitFailureTest.java | 2 +-
.../store/jdbc/TransactionContextTest.java | 2 +-
9 files changed, 126 insertions(+), 18 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
index 38aab9b..e93d9d9 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
@@ -57,4 +57,5 @@ public abstract class AbstractLocker extends ServiceSupport
implements Locker {
public void setLockable(LockableServiceSupport lockableServiceSupport) {
this.lockable = lockableServiceSupport;
}
+
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/Lockable.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/Lockable.java
index 734db55..9aab126 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/Lockable.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Lockable.java
@@ -32,6 +32,13 @@ public interface Lockable {
public void setUseLock(boolean useLock);
/**
+ * Stop the broker if the locker get an exception while processing lock.
+ *
+ * @param stopOnError
+ */
+ public void setStopOnError(boolean stopOnError);
+
+ /**
* Create a default locker
*
* @return default locker
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
index 786eefb..d842788 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
@@ -36,6 +36,7 @@ public abstract class LockableServiceSupport extends
ServiceSupport implements L
private static final Logger LOG =
LoggerFactory.getLogger(LockableServiceSupport.class);
boolean useLock = true;
+ boolean stopOnError = false;
Locker locker;
long lockKeepAlivePeriod = 0;
private ScheduledFuture<?> keepAliveTicket;
@@ -59,6 +60,15 @@ public abstract class LockableServiceSupport extends
ServiceSupport implements L
}
@Override
+ public void setStopOnError(boolean stopOnError) {
+ this.stopOnError = stopOnError;
+ }
+
+ public boolean isStopOnError() {
+ return this.stopOnError;
+ }
+
+ @Override
public void setLocker(Locker locker) throws IOException {
this.locker = locker;
locker.setLockable(this);
@@ -129,8 +139,14 @@ public abstract class LockableServiceSupport extends
ServiceSupport implements L
}
}
} catch (SuppressReplyException e) {
+ if (stopOnError) {
+ stop = true;
+ }
LOG.warn("locker keepAlive resulted in", e);
} catch (IOException e) {
+ if (stopOnError) {
+ stop = true;
+ }
LOG.warn("locker keepAlive resulted in", e);
}
if (stop) {
diff --git
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 8430328..eaddfb8 100644
---
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -94,6 +94,8 @@ public class JDBCPersistenceAdapter extends
DataSourceServiceSupport implements
private int transactionIsolation;
private File directory;
private boolean changeAutoCommitAllowed = true;
+ private int queryTimeout = -1;
+ private int networkTimeout = -1;
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000;
@@ -513,7 +515,7 @@ public class JDBCPersistenceAdapter extends
DataSourceServiceSupport implements
}
public TransactionContext getTransactionContext() throws IOException {
- TransactionContext answer = new TransactionContext(this);
+ TransactionContext answer = new TransactionContext(this,
networkTimeout, queryTimeout);
if (transactionIsolation > 0) {
answer.setTransactionIsolation(transactionIsolation);
}
@@ -564,6 +566,32 @@ public class JDBCPersistenceAdapter extends
DataSourceServiceSupport implements
this.changeAutoCommitAllowed = changeAutoCommitAllowed;
}
+ public int getNetworkTimeout() {
+ return networkTimeout;
+ }
+
+ /**
+ * Define the JDBC connection network timeout.
+ *
+ * @param networkTimeout the connection network timeout (in milliseconds).
+ */
+ public void setNetworkTimeout(int networkTimeout) {
+ this.networkTimeout = networkTimeout;
+ }
+
+ public int getQueryTimeout() {
+ return queryTimeout;
+ }
+
+ /**
+ * Define the JDBC statement query timeout.
+ *
+ * @param queryTimeout the statement query timeout (in seconds).
+ */
+ public void setQueryTimeout(int queryTimeout) {
+ this.queryTimeout = queryTimeout;
+ }
+
@Override
public void deleteAllMessages() throws IOException {
TransactionContext c = getTransactionContext();
diff --git
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
index db2aace..a820088 100644
---
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
+++
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -49,10 +50,14 @@ public class TransactionContext {
private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
private LinkedList<Runnable> completions = new LinkedList<Runnable>();
private ReentrantReadWriteLock exclusiveConnectionLock = new
ReentrantReadWriteLock();
+ private int networkTimeout;
+ private int queryTimeout;
- public TransactionContext(JDBCPersistenceAdapter persistenceAdapter)
throws IOException {
+ public TransactionContext(JDBCPersistenceAdapter persistenceAdapter, int
networkTimeout, int queryTimeout) throws IOException {
this.persistenceAdapter = persistenceAdapter;
this.dataSource = persistenceAdapter.getDataSource();
+ this.networkTimeout = networkTimeout;
+ this.queryTimeout = queryTimeout;
}
public Connection getExclusiveConnection() throws IOException {
@@ -68,6 +73,9 @@ public class TransactionContext {
toLock.lock();
try {
connection = dataSource.getConnection();
+ if (networkTimeout > 0) {
+
connection.setNetworkTimeout(Executors.newSingleThreadExecutor(),
networkTimeout);
+ }
if (persistenceAdapter.isChangeAutoCommitAllowed()) {
boolean autoCommit = !inTx;
if (connection.getAutoCommit() != autoCommit) {
@@ -300,17 +308,29 @@ public class TransactionContext {
// simple delegate for the rest of the impl..
@Override
public Statement createStatement() throws SQLException {
- return delegate.createStatement();
+ Statement statement = delegate.createStatement();
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
public PreparedStatement prepareStatement(String sql) throws
SQLException {
- return delegate.prepareStatement(sql);
+ PreparedStatement statement = delegate.prepareStatement(sql);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
- return delegate.prepareCall(sql);
+ CallableStatement statement = delegate.prepareCall(sql);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
@@ -390,17 +410,29 @@ public class TransactionContext {
@Override
public Statement createStatement(int resultSetType, int
resultSetConcurrency) throws SQLException {
- return delegate.createStatement(resultSetType,
resultSetConcurrency);
+ Statement statement = delegate.createStatement(resultSetType,
resultSetConcurrency);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
public PreparedStatement prepareStatement(String sql, int
resultSetType, int resultSetConcurrency) throws SQLException {
- return delegate.prepareStatement(sql, resultSetType,
resultSetConcurrency);
+ PreparedStatement statement = delegate.prepareStatement(sql,
resultSetType, resultSetConcurrency);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType,
int resultSetConcurrency) throws SQLException {
- return delegate.prepareCall(sql, resultSetType,
resultSetConcurrency);
+ CallableStatement statement = delegate.prepareCall(sql,
resultSetType, resultSetConcurrency);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
@@ -445,32 +477,56 @@ public class TransactionContext {
@Override
public Statement createStatement(int resultSetType, int
resultSetConcurrency, int resultSetHoldability) throws SQLException {
- return delegate.createStatement(resultSetType,
resultSetConcurrency, resultSetHoldability);
+ Statement statement = delegate.createStatement(resultSetType,
resultSetConcurrency, resultSetHoldability);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
public PreparedStatement prepareStatement(String sql, int
resultSetType, int resultSetConcurrency, int resultSetHoldability) throws
SQLException {
- return delegate.prepareStatement(sql, resultSetType,
resultSetConcurrency, resultSetHoldability);
+ PreparedStatement statement = delegate.prepareStatement(sql,
resultSetType, resultSetConcurrency, resultSetHoldability);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType,
int resultSetConcurrency, int resultSetHoldability) throws SQLException {
- return delegate.prepareCall(sql, resultSetType,
resultSetConcurrency, resultSetHoldability);
+ CallableStatement statement = delegate.prepareCall(sql,
resultSetType, resultSetConcurrency, resultSetHoldability);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
public PreparedStatement prepareStatement(String sql, int
autoGeneratedKeys) throws SQLException {
- return delegate.prepareStatement(sql, autoGeneratedKeys);
+ PreparedStatement statement = delegate.prepareStatement(sql,
autoGeneratedKeys);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
public PreparedStatement prepareStatement(String sql, int[]
columnIndexes) throws SQLException {
- return delegate.prepareStatement(sql, columnIndexes);
+ PreparedStatement statement = delegate.prepareStatement(sql,
columnIndexes);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
public PreparedStatement prepareStatement(String sql, String[]
columnNames) throws SQLException {
- return delegate.prepareStatement(sql, columnNames);
+ PreparedStatement statement = delegate.prepareStatement(sql,
columnNames);
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ return statement;
}
@Override
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
index 9b397b0..7f88907 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
@@ -243,7 +243,7 @@ public class AMQ4636Test {
public TestTransactionContext(
JDBCPersistenceAdapter jdbcPersistenceAdapter)
throws IOException {
- super(jdbcPersistenceAdapter);
+ super(jdbcPersistenceAdapter, -1, -1);
}
@Override
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
index 18b7c21..ae1c442 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
@@ -277,7 +277,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
public TestTransactionContext(
JDBCPersistenceAdapter jdbcPersistenceAdapter)
throws IOException {
- super(jdbcPersistenceAdapter);
+ super(jdbcPersistenceAdapter, -1, -1);
}
public void executeBatch() throws SQLException {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest.java
index db7d156..0ef2bec 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JmsTransactionCommitFailureTest.java
@@ -335,7 +335,7 @@ public class JmsTransactionCommitFailureTest {
@Override
public TransactionContext getTransactionContext() throws IOException {
- TransactionContext answer = new TransactionContext(this) {
+ TransactionContext answer = new TransactionContext(this, -1, -1) {
@Override
public void executeBatch() throws SQLException {
if (isCommitFailureEnabled) {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/TransactionContextTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/TransactionContextTest.java
index dcc5b27..9041f7d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/TransactionContextTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/TransactionContextTest.java
@@ -41,7 +41,7 @@ public class TransactionContextTest {
@Before
public void setup() throws Exception {
- underTest = new TransactionContext(jdbcPersistenceAdapter);
+ underTest = new TransactionContext(jdbcPersistenceAdapter, 0, 0);
}
@Test