[ 
https://issues.apache.org/jira/browse/AMQ-4365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13597342#comment-13597342
 ] 

Paul Gale edited comment on AMQ-4365 at 3/8/13 6:10 PM:
--------------------------------------------------------

So I've been able to get a first cut at a lease database locker that works with 
the KahaDBPersistenceAdapter, as shown below. This is by no means the ideal 
solution but it works. Feedback welcome.

Longer term, the signature of the configure method should be changed; there is 
no reason that I can see why it should be coupled to a PersistenceAdapter - 
that's too broad. Either provide a few overloaded versions of configure() with 
various arguments that a locker might require or create an interface to be 
implemented by a persistence provider that makes the contract between the two 
more explicit.
\\
\\
{code:title=MyLeaseDatabaseLocker.java}
public class MyLeaseDatabaseLocker extends LeaseDatabaseLocker implements 
BrokerServiceAware, LockDataSourceCapable {
  private static final Logger LOG = 
LoggerFactory.getLogger(MyLeaseDatabaseLocker.class);
  protected BrokerService brokerService;

  @Override
  public void configure(PersistenceAdapter ignore) throws IOException {
    this.statements = new Statements();
  }

  @Override
  public boolean keepAlive() throws IOException {
    boolean result = false;
    final String sql = statements.getLeaseUpdateStatement();

    LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);

    Connection connection = null;
    PreparedStatement statement = null;

    try {
      connection = getConnection();

      initTimeDiff(connection);
      statement = connection.prepareStatement(sql);
      setQueryTimeout(statement);

      final long now = System.currentTimeMillis() + diffFromCurrentTime;
      statement.setString(1, getLeaseHolderId());
      statement.setLong(2, now + lockAcquireSleepInterval);
      statement.setString(3, getLeaseHolderId());

      result = (statement.executeUpdate() == 1);
    }
    catch(Exception e) {
      LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
      IOException ioe = IOExceptionSupport.create(e);
      brokerService.handleIOException(ioe);
      throw ioe;
    }
    finally {
      close(statement);
      close(connection);
    }

    return result;
  }

  @Override
  public void setBrokerService(BrokerService brokerService) {
    this.brokerService = brokerService;
  }

  @Override
  public String getLeaseHolderId() {
    if(leaseHolderId == null && brokerService != null) {
      leaseHolderId = brokerService.getBrokerName();
    }

    return leaseHolderId;
  }

  @Override
  public void setLockDataSource(DataSource lockDataSource) {
    this.dataSource = lockDataSource;
  }

  @Override
  public DataSource getLockDataSource() {
    return this.dataSource;
  }

  private void setQueryTimeout(PreparedStatement statement) throws SQLException 
{
    if(queryTimeout > 0) {
      statement.setQueryTimeout(queryTimeout);
    }
  }

  private Connection getConnection() throws SQLException {
    return dataSource.getConnection();
  }

  private void close(Connection connection) {
    if(null == connection) return;

    try {
      connection.close();
    }
    catch(SQLException e1) {
      LOG.debug(getLeaseHolderId() + " caught exception while closing 
connection: " + e1, e1);
    }
  }

  private void close(PreparedStatement statement) {
    if(null == statement) return;

    try {
      statement.close();
    }
    catch(SQLException e1) {
      LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1, 
e1);
    }
  }
}
{code}

{code:title=LockDataSourceCapable.java}
public interface LockDataSourceCapable {
  public void setLockDataSource(DataSource lockDataSource);
  public DataSource getLockDataSource();
}
{code}

{code:xml|title=activemq.xml (with parts removed)}
<beans
    xmlns="http://www.springframework.org/schema/beans";
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        
http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://activemq.apache.org/schema/core
                        
http://activemq.apache.org/schema/core/activemq-core.xsd";>

  <broker xmlns="http://activemq.apache.org/schema/core";
          id="theBroker">

    <persistenceAdapter>
      <kahaDB directory="${activemq.data}/kahadb"
              lockKeepAlivePeriod="2500"
              useLock="true">
        <locker>
          <bean xmlns="http://www.springframework.org/schema/beans";
                class="com.example.activemq.MyLeaseDatabaseLocker">
            <property name="lockDataSource" ref="mysql-ds"/>
            <property name="brokerService" ref="theBroker"/>
            <property name="failIfLocked" value="false"/>
            <property name="lockAcquireSleepInterval" value="5000"/>
          </bean>
        </locker>
      </kahaDB>
    </persistenceAdapter>

  </broker>

  <bean id="mysql-ds"
        class="org.apache.commons.dbcp.BasicDataSource"
        destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" 
value="jdbc:mysql://localhost/activemq?relaxAutoCommmit=true"/>
    <property name="username" value="whatever"/>
    <property name="password" value="whatever"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>
</beans>
{code}

                
      was (Author: paulgale):
    So I've been able to get a first cut at a lease database locker that works 
with the KahaDBPersistenceAdapter, as shown below. This is by no means the 
ideal solution but it works. Feedback welcome.

Longer term, the signature of the configure method should be changed; there is 
no reason that I can see why it should be coupled to a PersistenceAdapter - 
that's too broad. Either provide a few overloaded versions of configure() with 
various arguments that a locker might require or create an interface to be 
implemented by a persistence provider that makes the contract between the two 
more explicit.


{code:title=MyLeaseDatabaseLocker.java}
public class MyLeaseDatabaseLocker extends LeaseDatabaseLocker implements 
BrokerServiceAware, LockDataSourceCapable {
  private static final Logger LOG = 
LoggerFactory.getLogger(MyLeaseDatabaseLocker.class);
  protected BrokerService brokerService;

  @Override
  public void configure(PersistenceAdapter ignore) throws IOException {
    this.statements = new Statements();
  }

  @Override
  public boolean keepAlive() throws IOException {
    boolean result = false;
    final String sql = statements.getLeaseUpdateStatement();

    LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);

    Connection connection = null;
    PreparedStatement statement = null;

    try {
      connection = getConnection();

      initTimeDiff(connection);
      statement = connection.prepareStatement(sql);
      setQueryTimeout(statement);

      final long now = System.currentTimeMillis() + diffFromCurrentTime;
      statement.setString(1, getLeaseHolderId());
      statement.setLong(2, now + lockAcquireSleepInterval);
      statement.setString(3, getLeaseHolderId());

      result = (statement.executeUpdate() == 1);
    }
    catch(Exception e) {
      LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
      IOException ioe = IOExceptionSupport.create(e);
      brokerService.handleIOException(ioe);
      throw ioe;
    }
    finally {
      close(statement);
      close(connection);
    }

    return result;
  }

  @Override
  public void setBrokerService(BrokerService brokerService) {
    this.brokerService = brokerService;
  }

  @Override
  public String getLeaseHolderId() {
    if(leaseHolderId == null && brokerService != null) {
      leaseHolderId = brokerService.getBrokerName();
    }

    return leaseHolderId;
  }

  @Override
  public void setLockDataSource(DataSource lockDataSource) {
    this.dataSource = lockDataSource;
  }

  @Override
  public DataSource getLockDataSource() {
    return this.dataSource;
  }

  private void setQueryTimeout(PreparedStatement statement) throws SQLException 
{
    if(queryTimeout > 0) {
      statement.setQueryTimeout(queryTimeout);
    }
  }

  private Connection getConnection() throws SQLException {
    return dataSource.getConnection();
  }

  private void close(Connection connection) {
    if(null == connection) return;

    try {
      connection.close();
    }
    catch(SQLException e1) {
      LOG.debug(getLeaseHolderId() + " caught exception while closing 
connection: " + e1, e1);
    }
  }

  private void close(PreparedStatement statement) {
    if(null == statement) return;

    try {
      statement.close();
    }
    catch(SQLException e1) {
      LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1, 
e1);
    }
  }
}
{code}

{code:title=LockDataSourceCapable.java}
public interface LockDataSourceCapable {
  public void setLockDataSource(DataSource lockDataSource);
  public DataSource getLockDataSource();
}
{code}

{code:xml:title=activemq.xml (with parts removed)}
<beans
    xmlns="http://www.springframework.org/schema/beans";
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        
http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://activemq.apache.org/schema/core
                        
http://activemq.apache.org/schema/core/activemq-core.xsd";>

  <broker xmlns="http://activemq.apache.org/schema/core";
          id="theBroker">

    <persistenceAdapter>
      <kahaDB directory="${activemq.data}/kahadb"
              lockKeepAlivePeriod="2500"
              useLock="true">
        <locker>
          <bean xmlns="http://www.springframework.org/schema/beans";
                class="com.example.activemq.MyLeaseDatabaseLocker">
            <property name="lockDataSource" ref="mysql-ds"/>
            <property name="brokerService" ref="theBroker"/>
            <property name="failIfLocked" value="false"/>
            <property name="lockAcquireSleepInterval" value="5000"/>
          </bean>
        </locker>
      </kahaDB>
    </persistenceAdapter>

  </broker>

  <bean id="mysql-ds"
        class="org.apache.commons.dbcp.BasicDataSource"
        destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" 
value="jdbc:mysql://localhost/activemq?relaxAutoCommmit=true"/>
    <property name="username" value="whatever"/>
    <property name="password" value="whatever"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>
</beans>
{code}

                  
> Allow the Lease Locker to be used with out a JDBCPersistenceAdapter - so it 
> can be a broker lock
> ------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4365
>                 URL: https://issues.apache.org/jira/browse/AMQ-4365
>             Project: ActiveMQ
>          Issue Type: Improvement
>    Affects Versions: 5.8.0
>            Reporter: Gary Tully
>             Fix For: 5.9.0
>
>
> The locker interface needs another configure option to provide a broker 
> service, or needs to be brokerService aware so that a locker can get identity 
> and access to the io exception handlers.
> The lease database locker is dependent on the jdbc pa to get statements and 
> data source. It should be possible to configure these independently such that 
> it can be used standalone as a broker lock. So setters for each.
> This will help sort out some of the dependencies between broker and lock 
> implementations. also making it possible to use a lease lock with kahadb for 
> example.
> some context: 
> http://mail-archives.apache.org/mod_mbox/activemq-users/201303.mbox/%3ccaj5znhuruz+aewsaabajtwbbpkwn06ryyyt6nqsdg_su7vm...@mail.gmail.com%3E

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to