[
https://issues.apache.org/jira/browse/AMQ-4365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13597342#comment-13597342
]
Paul Gale edited comment on AMQ-4365 at 3/8/13 6:28 PM:
--------------------------------------------------------
So I've been able to get a first cut at a lease database locker that works with
the KahaDBPersistenceAdapter, as shown below. This is by no means the ideal
solution but it works. Feedback welcome.
Longer term, the signature of the configure method should be changed; there is
no reason that I can see why it should be coupled to a PersistenceAdapter -
that's too broad. Either provide a few overloaded versions of configure() with
various arguments that a locker might require or create an interface to be
implemented by a persistence adapter that makes the contract between the two
more explicit.
\\
\\
{code:title=MyLeaseDatabaseLocker.java}
public class MyLeaseDatabaseLocker extends LeaseDatabaseLocker implements
BrokerServiceAware, LockDataSourceCapable {
private static final Logger LOG =
LoggerFactory.getLogger(MyLeaseDatabaseLocker.class);
protected BrokerService brokerService;
@Override
public void configure(PersistenceAdapter ignore) throws IOException {
this.statements = new Statements();
}
@Override
public boolean keepAlive() throws IOException {
boolean result = false;
final String sql = statements.getLeaseUpdateStatement();
LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
Connection connection = null;
PreparedStatement statement = null;
try {
connection = getConnection();
initTimeDiff(connection);
statement = connection.prepareStatement(sql);
setQueryTimeout(statement);
final long now = System.currentTimeMillis() + diffFromCurrentTime;
statement.setString(1, getLeaseHolderId());
statement.setLong(2, now + lockAcquireSleepInterval);
statement.setString(3, getLeaseHolderId());
result = (statement.executeUpdate() == 1);
}
catch(Exception e) {
LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
IOException ioe = IOExceptionSupport.create(e);
brokerService.handleIOException(ioe);
throw ioe;
}
finally {
close(statement);
close(connection);
}
return result;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
public String getLeaseHolderId() {
if(leaseHolderId == null && brokerService != null) {
leaseHolderId = brokerService.getBrokerName();
}
return leaseHolderId;
}
@Override
public void setLockDataSource(DataSource lockDataSource) {
this.dataSource = lockDataSource;
}
@Override
public DataSource getLockDataSource() {
return this.dataSource;
}
private void setQueryTimeout(PreparedStatement statement) throws SQLException
{
if(queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
}
private Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
private void close(Connection connection) {
if(null == connection) return;
try {
connection.close();
}
catch(SQLException e1) {
LOG.debug(getLeaseHolderId() + " caught exception while closing
connection: " + e1, e1);
}
}
private void close(PreparedStatement statement) {
if(null == statement) return;
try {
statement.close();
}
catch(SQLException e1) {
LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1,
e1);
}
}
}
{code}
{code:title=LockDataSourceCapable.java}
public interface LockDataSourceCapable {
public void setLockDataSource(DataSource lockDataSource);
public DataSource getLockDataSource();
}
{code}
{code:xml|title=activemq.xml (with parts removed)}
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core"
id="theBroker">
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"
lockKeepAlivePeriod="2500"
useLock="true">
<locker>
<bean xmlns="http://www.springframework.org/schema/beans"
class="com.example.activemq.MyLeaseDatabaseLocker">
<property name="lockDataSource" ref="mysql-ds"/>
<property name="brokerService" ref="theBroker"/>
<property name="failIfLocked" value="false"/>
<property name="lockAcquireSleepInterval" value="5000"/>
</bean>
</locker>
</kahaDB>
</persistenceAdapter>
</broker>
<bean id="mysql-ds"
class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url"
value="jdbc:mysql://localhost/activemq?relaxAutoCommmit=true"/>
<property name="username" value="whatever"/>
<property name="password" value="whatever"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
{code}
was (Author: paulgale):
So I've been able to get a first cut at a lease database locker that works
with the KahaDBPersistenceAdapter, as shown below. This is by no means the
ideal solution but it works. Feedback welcome.
Longer term, the signature of the configure method should be changed; there is
no reason that I can see why it should be coupled to a PersistenceAdapter -
that's too broad. Either provide a few overloaded versions of configure() with
various arguments that a locker might require or create an interface to be
implemented by a persistence provider that makes the contract between the two
more explicit.
\\
\\
{code:title=MyLeaseDatabaseLocker.java}
public class MyLeaseDatabaseLocker extends LeaseDatabaseLocker implements
BrokerServiceAware, LockDataSourceCapable {
private static final Logger LOG =
LoggerFactory.getLogger(MyLeaseDatabaseLocker.class);
protected BrokerService brokerService;
@Override
public void configure(PersistenceAdapter ignore) throws IOException {
this.statements = new Statements();
}
@Override
public boolean keepAlive() throws IOException {
boolean result = false;
final String sql = statements.getLeaseUpdateStatement();
LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
Connection connection = null;
PreparedStatement statement = null;
try {
connection = getConnection();
initTimeDiff(connection);
statement = connection.prepareStatement(sql);
setQueryTimeout(statement);
final long now = System.currentTimeMillis() + diffFromCurrentTime;
statement.setString(1, getLeaseHolderId());
statement.setLong(2, now + lockAcquireSleepInterval);
statement.setString(3, getLeaseHolderId());
result = (statement.executeUpdate() == 1);
}
catch(Exception e) {
LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
IOException ioe = IOExceptionSupport.create(e);
brokerService.handleIOException(ioe);
throw ioe;
}
finally {
close(statement);
close(connection);
}
return result;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
public String getLeaseHolderId() {
if(leaseHolderId == null && brokerService != null) {
leaseHolderId = brokerService.getBrokerName();
}
return leaseHolderId;
}
@Override
public void setLockDataSource(DataSource lockDataSource) {
this.dataSource = lockDataSource;
}
@Override
public DataSource getLockDataSource() {
return this.dataSource;
}
private void setQueryTimeout(PreparedStatement statement) throws SQLException
{
if(queryTimeout > 0) {
statement.setQueryTimeout(queryTimeout);
}
}
private Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
private void close(Connection connection) {
if(null == connection) return;
try {
connection.close();
}
catch(SQLException e1) {
LOG.debug(getLeaseHolderId() + " caught exception while closing
connection: " + e1, e1);
}
}
private void close(PreparedStatement statement) {
if(null == statement) return;
try {
statement.close();
}
catch(SQLException e1) {
LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1,
e1);
}
}
}
{code}
{code:title=LockDataSourceCapable.java}
public interface LockDataSourceCapable {
public void setLockDataSource(DataSource lockDataSource);
public DataSource getLockDataSource();
}
{code}
{code:xml|title=activemq.xml (with parts removed)}
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core"
id="theBroker">
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"
lockKeepAlivePeriod="2500"
useLock="true">
<locker>
<bean xmlns="http://www.springframework.org/schema/beans"
class="com.example.activemq.MyLeaseDatabaseLocker">
<property name="lockDataSource" ref="mysql-ds"/>
<property name="brokerService" ref="theBroker"/>
<property name="failIfLocked" value="false"/>
<property name="lockAcquireSleepInterval" value="5000"/>
</bean>
</locker>
</kahaDB>
</persistenceAdapter>
</broker>
<bean id="mysql-ds"
class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url"
value="jdbc:mysql://localhost/activemq?relaxAutoCommmit=true"/>
<property name="username" value="whatever"/>
<property name="password" value="whatever"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
{code}
> Allow the Lease Locker to be used with out a JDBCPersistenceAdapter - so it
> can be a broker lock
> ------------------------------------------------------------------------------------------------
>
> Key: AMQ-4365
> URL: https://issues.apache.org/jira/browse/AMQ-4365
> Project: ActiveMQ
> Issue Type: Improvement
> Affects Versions: 5.8.0
> Reporter: Gary Tully
> Fix For: 5.9.0
>
>
> The locker interface needs another configure option to provide a broker
> service, or needs to be brokerService aware so that a locker can get identity
> and access to the io exception handlers.
> The lease database locker is dependent on the jdbc pa to get statements and
> data source. It should be possible to configure these independently such that
> it can be used standalone as a broker lock. So setters for each.
> This will help sort out some of the dependencies between broker and lock
> implementations. also making it possible to use a lease lock with kahadb for
> example.
> some context:
> http://mail-archives.apache.org/mod_mbox/activemq-users/201303.mbox/%3ccaj5znhuruz+aewsaabajtwbbpkwn06ryyyt6nqsdg_su7vm...@mail.gmail.com%3E
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira