This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new dc60c3c  [AMQ-7473] Add stopOnError configuration to stop the broker 
when locker has an exception
dc60c3c is described below

commit dc60c3c6114e2008c1aed70f5be43b2d799f5d64
Author: jbonofre <[email protected]>
AuthorDate: Mon May 4 11:08:50 2020 +0200

    [AMQ-7473] Add stopOnError configuration to stop the broker when locker has 
an exception
    
    (cherry picked from commit e9caa75b1a600ea61f98b1f71d16c0443e2f5b41)
---
 .../org/apache/activemq/broker/AbstractLocker.java |  1 +
 .../java/org/apache/activemq/broker/Lockable.java  |  7 ++
 .../activemq/broker/LockableServiceSupport.java    | 16 +++++
 .../store/jdbc/JDBCPersistenceAdapter.java         | 30 +++++++-
 .../activemq/store/jdbc/TransactionContext.java    | 82 ++++++++++++++++++----
 .../java/org/apache/activemq/bugs/AMQ4636Test.java |  2 +-
 .../activemq/bugs/TrapMessageInJDBCStoreTest.java  |  2 +-
 .../store/jdbc/TransactionContextTest.java         |  2 +-
 8 files changed, 125 insertions(+), 17 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
index 38aab9b..e93d9d9 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/AbstractLocker.java
@@ -57,4 +57,5 @@ public abstract class AbstractLocker extends ServiceSupport 
implements Locker {
     public void setLockable(LockableServiceSupport lockableServiceSupport) {
         this.lockable = lockableServiceSupport;
     }
+
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/Lockable.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/Lockable.java
index 734db55..9aab126 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/Lockable.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Lockable.java
@@ -32,6 +32,13 @@ public interface Lockable {
     public void setUseLock(boolean useLock);
 
     /**
+     * Stop the broker if the locker get an exception while processing lock.
+     *
+     * @param stopOnError
+     */
+    public void setStopOnError(boolean stopOnError);
+
+    /**
      * Create a default locker
      *
      * @return default locker
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
index 786eefb..d842788 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
@@ -36,6 +36,7 @@ public abstract class LockableServiceSupport extends 
ServiceSupport implements L
 
     private static final Logger LOG = 
LoggerFactory.getLogger(LockableServiceSupport.class);
     boolean useLock = true;
+    boolean stopOnError = false;
     Locker locker;
     long lockKeepAlivePeriod = 0;
     private ScheduledFuture<?> keepAliveTicket;
@@ -59,6 +60,15 @@ public abstract class LockableServiceSupport extends 
ServiceSupport implements L
     }
 
     @Override
+    public void setStopOnError(boolean stopOnError) {
+        this.stopOnError = stopOnError;
+    }
+
+    public boolean isStopOnError() {
+        return this.stopOnError;
+    }
+
+    @Override
     public void setLocker(Locker locker) throws IOException {
         this.locker = locker;
         locker.setLockable(this);
@@ -129,8 +139,14 @@ public abstract class LockableServiceSupport extends 
ServiceSupport implements L
                 }
             }
         } catch (SuppressReplyException e) {
+            if (stopOnError) {
+                stop = true;
+            }
             LOG.warn("locker keepAlive resulted in", e);
         } catch (IOException e) {
+            if (stopOnError) {
+                stop = true;
+            }
             LOG.warn("locker keepAlive resulted in", e);
         }
         if (stop) {
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 38a2772..e327e38 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -93,6 +93,8 @@ public class JDBCPersistenceAdapter extends 
DataSourceServiceSupport implements
     private int transactionIsolation;
     private File directory;
     private boolean changeAutoCommitAllowed = true;
+    private int queryTimeout = -1;
+    private int networkTimeout = -1;
 
     protected int maxProducersToAudit=1024;
     protected int maxAuditDepth=1000;
@@ -504,7 +506,7 @@ public class JDBCPersistenceAdapter extends 
DataSourceServiceSupport implements
     }
 
     public TransactionContext getTransactionContext() throws IOException {
-        TransactionContext answer = new TransactionContext(this);
+        TransactionContext answer = new TransactionContext(this, 
networkTimeout, queryTimeout);
         if (transactionIsolation > 0) {
             answer.setTransactionIsolation(transactionIsolation);
         }
@@ -555,6 +557,32 @@ public class JDBCPersistenceAdapter extends 
DataSourceServiceSupport implements
         this.changeAutoCommitAllowed = changeAutoCommitAllowed;
     }
 
+    public int getNetworkTimeout() {
+        return networkTimeout;
+    }
+
+    /**
+     * Define the JDBC connection network timeout.
+     *
+     * @param networkTimeout the connection network timeout (in milliseconds).
+     */
+    public void setNetworkTimeout(int networkTimeout) {
+        this.networkTimeout = networkTimeout;
+    }
+
+    public int getQueryTimeout() {
+        return queryTimeout;
+    }
+
+    /**
+     * Define the JDBC statement query timeout.
+     *
+     * @param queryTimeout the statement query timeout (in seconds).
+     */
+    public void setQueryTimeout(int queryTimeout) {
+        this.queryTimeout = queryTimeout;
+    }
+
     @Override
     public void deleteAllMessages() throws IOException {
         TransactionContext c = getTransactionContext();
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
index 7b0f61c..ba65e5b 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -49,10 +50,14 @@ public class TransactionContext {
     private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
     private LinkedList<Runnable> completions = new LinkedList<Runnable>();
     private ReentrantReadWriteLock exclusiveConnectionLock = new 
ReentrantReadWriteLock();
+    private int networkTimeout;
+    private int queryTimeout;
 
-    public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) 
throws IOException {
+    public TransactionContext(JDBCPersistenceAdapter persistenceAdapter, int 
networkTimeout, int queryTimeout) throws IOException {
         this.persistenceAdapter = persistenceAdapter;
         this.dataSource = persistenceAdapter.getDataSource();
+        this.networkTimeout = networkTimeout;
+        this.queryTimeout = queryTimeout;
     }
 
     public Connection getExclusiveConnection() throws IOException {
@@ -68,6 +73,9 @@ public class TransactionContext {
             toLock.lock();
             try {
                 connection = dataSource.getConnection();
+                if (networkTimeout > 0) {
+                    
connection.setNetworkTimeout(Executors.newSingleThreadExecutor(), 
networkTimeout);
+                }
                 if (persistenceAdapter.isChangeAutoCommitAllowed()) {
                     boolean autoCommit = !inTx;
                     if (connection.getAutoCommit() != autoCommit) {
@@ -300,17 +308,29 @@ public class TransactionContext {
         // simple delegate for the  rest of the impl..
         @Override
         public Statement createStatement() throws SQLException {
-            return delegate.createStatement();
+            Statement statement = delegate.createStatement();
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
         public PreparedStatement prepareStatement(String sql) throws 
SQLException {
-            return delegate.prepareStatement(sql);
+            PreparedStatement statement = delegate.prepareStatement(sql);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
         public CallableStatement prepareCall(String sql) throws SQLException {
-            return delegate.prepareCall(sql);
+            CallableStatement statement = delegate.prepareCall(sql);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
@@ -390,17 +410,29 @@ public class TransactionContext {
 
         @Override
         public Statement createStatement(int resultSetType, int 
resultSetConcurrency) throws SQLException {
-            return delegate.createStatement(resultSetType, 
resultSetConcurrency);
+            Statement statement = delegate.createStatement(resultSetType, 
resultSetConcurrency);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
         public PreparedStatement prepareStatement(String sql, int 
resultSetType, int resultSetConcurrency) throws SQLException {
-            return delegate.prepareStatement(sql, resultSetType, 
resultSetConcurrency);
+            PreparedStatement statement = delegate.prepareStatement(sql, 
resultSetType, resultSetConcurrency);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
         public CallableStatement prepareCall(String sql, int resultSetType, 
int resultSetConcurrency) throws SQLException {
-            return delegate.prepareCall(sql, resultSetType, 
resultSetConcurrency);
+            CallableStatement statement = delegate.prepareCall(sql, 
resultSetType, resultSetConcurrency);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
@@ -445,32 +477,56 @@ public class TransactionContext {
 
         @Override
         public Statement createStatement(int resultSetType, int 
resultSetConcurrency, int resultSetHoldability) throws SQLException {
-            return delegate.createStatement(resultSetType, 
resultSetConcurrency, resultSetHoldability);
+            Statement statement = delegate.createStatement(resultSetType, 
resultSetConcurrency, resultSetHoldability);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
         public PreparedStatement prepareStatement(String sql, int 
resultSetType, int resultSetConcurrency, int resultSetHoldability) throws 
SQLException {
-            return delegate.prepareStatement(sql, resultSetType, 
resultSetConcurrency, resultSetHoldability);
+            PreparedStatement statement = delegate.prepareStatement(sql, 
resultSetType, resultSetConcurrency, resultSetHoldability);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
         public CallableStatement prepareCall(String sql, int resultSetType, 
int resultSetConcurrency, int resultSetHoldability) throws SQLException {
-            return delegate.prepareCall(sql, resultSetType, 
resultSetConcurrency, resultSetHoldability);
+            CallableStatement statement = delegate.prepareCall(sql, 
resultSetType, resultSetConcurrency, resultSetHoldability);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
         public PreparedStatement prepareStatement(String sql, int 
autoGeneratedKeys) throws SQLException {
-            return delegate.prepareStatement(sql, autoGeneratedKeys);
+            PreparedStatement statement = delegate.prepareStatement(sql, 
autoGeneratedKeys);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
         public PreparedStatement prepareStatement(String sql, int[] 
columnIndexes) throws SQLException {
-            return delegate.prepareStatement(sql, columnIndexes);
+            PreparedStatement statement = delegate.prepareStatement(sql, 
columnIndexes);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
         public PreparedStatement prepareStatement(String sql, String[] 
columnNames) throws SQLException {
-            return delegate.prepareStatement(sql, columnNames);
+            PreparedStatement statement = delegate.prepareStatement(sql, 
columnNames);
+            if (queryTimeout > 0) {
+                statement.setQueryTimeout(queryTimeout);
+            }
+            return statement;
         }
 
         @Override
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
index 9b397b0..7f88907 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
@@ -243,7 +243,7 @@ public class AMQ4636Test {
         public TestTransactionContext(
                 JDBCPersistenceAdapter jdbcPersistenceAdapter)
                 throws IOException {
-            super(jdbcPersistenceAdapter);
+            super(jdbcPersistenceAdapter, -1, -1);
         }
 
         @Override
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
index 18b7c21..ae1c442 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java
@@ -277,7 +277,7 @@ public class TrapMessageInJDBCStoreTest extends TestCase {
         public TestTransactionContext(
                 JDBCPersistenceAdapter jdbcPersistenceAdapter)
                 throws IOException {
-            super(jdbcPersistenceAdapter);
+            super(jdbcPersistenceAdapter, -1, -1);
         }
 
         public void executeBatch() throws SQLException {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/TransactionContextTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/TransactionContextTest.java
index dcc5b27..9041f7d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/TransactionContextTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/TransactionContextTest.java
@@ -41,7 +41,7 @@ public class TransactionContextTest {
 
     @Before
     public void setup() throws Exception {
-        underTest = new TransactionContext(jdbcPersistenceAdapter);
+        underTest = new TransactionContext(jdbcPersistenceAdapter, 0, 0);
     }
 
     @Test

Reply via email to