User: patriot1burke
Date: 01/08/08 15:02:21
Added: src/main/org/jboss/ejb/plugins/lock
QueuedPessimisticEJBLock.java
Log:
new FIFO txWait queue. Should have better performance
Revision Changes Path
1.1
jboss/src/main/org/jboss/ejb/plugins/lock/QueuedPessimisticEJBLock.java
Index: QueuedPessimisticEJBLock.java
===================================================================
/*
* JBoss, the OpenSource EJB server
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.ejb.plugins.lock;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.Collections;
import java.lang.reflect.Method;
import javax.transaction.Transaction;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.TransactionManager;
import javax.transaction.RollbackException;
import javax.ejb.EJBObject;
import org.jboss.ejb.MethodInvocation;
import org.jboss.logging.log4j.JBossCategory;
import java.io.FileOutputStream;
import java.io.PrintStream;
/**
* This class is holds threads awaiting the transactional lock to be free
* in a fair FIFO transactional queue. Non-transactional threads
* are also put in this wait queue as well. Unlike SimplePessimisticEJBLock which
notifies all
* threads on transaction completion, this class pops the next waiting transaction
from the queue
* and notifies only those threads waiting associated with that transaction. This
* class should perform better than Simple on high contention loads.
*
* Holds all locks for entity beans, not used for stateful. <p>
*
* All BeanLocks have a reference count.
* When the reference count goes to 0, the lock is released from the
* id -> lock mapping.
*
* @author <a href="[EMAIL PROTECTED]">Bill Burke</a>
*
* @version $Revision: 1.1 $
*
* <p><b>Revisions:</b><br>
* <p><b>2001/08/03: billb</b>
* <ol>
* <li>Initial revision
* </ol>
*/
public class QueuedPessimisticEJBLock extends BeanLockSupport
{
private Object methodLock = new Object();
private HashMap txLocks = new HashMap();
private LinkedList txWaitQueue = new LinkedList();
private int txIdGen = 0;
private class TxLock
{
public Transaction tx = null;
public int id = 0;
public String threadName;
public boolean isQueued;
public TxLock(Transaction tx)
{
this.threadName = Thread.currentThread().toString();
this.tx = tx;
if (tx == null)
{
if (txIdGen < 0) txIdGen = 0;
this.id = txIdGen++;
}
this.isQueued = true;
}
public boolean equals(Object obj)
{
if (obj == this) return true;
TxLock lock = (TxLock)obj;
if (lock.tx == null && this.tx == null)
{
return lock.id == this.id;
}
else if (lock.tx != null && this.tx != null)
{
return lock.tx.equals(this.tx);
}
return false;
}
public int hashCode()
{
return this.id;
}
}
protected TxLock getTxLock(Transaction miTx)
{
TxLock lock = null;
if (miTx == null)
{
// There is no transaction
lock = new TxLock(null);
txWaitQueue.addLast(lock);
}
else
{
TxLock key = new TxLock(miTx);
lock = (TxLock)txLocks.get(key);
if (lock == null)
{
txLocks.put(key, key);
txWaitQueue.addLast(key);
lock = key;
}
}
return lock;
}
protected boolean isTxExpired(Transaction miTx) throws Exception
{
if (miTx != null && miTx.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
return true;
}
return false;
}
public void schedule(MethodInvocation mi) throws Exception
{
boolean threadScheduled = false;
while (!threadScheduled)
{
/* loop on lock wakeup and restart trying to schedule */
threadScheduled = doSchedule(mi);
}
}
/**
* doSchedule(MethodInvocation)
*
* doSchedule implements a particular policy for scheduling the threads coming
in.
* There is always the spec required "serialization" but we can add custom
scheduling in here
*
* Synchronizing on lock: a failure to get scheduled must result in a wait() call
and a
* release of the lock. Schedulation must return with lock.
*
*/
protected boolean doSchedule(MethodInvocation mi)
throws Exception
{
this.sync();
boolean wasThreadScheduled = false;
Transaction miTx = mi.getTransaction();
boolean trace = log.isTraceEnabled();
try
{
if( trace ) log.trace("Begin schedule, key="+mi.getId());
if (isTxExpired(miTx))
{
log.error("Saw rolled back tx="+miTx);
throw new RuntimeException("Transaction marked for rollback, possibly a
timeout");
}
//Next test is independent of whether the context is locked or not, it is
purely transactional
// Is the instance involved with another transaction? if so we implement
pessimistic locking
wasThreadScheduled = waitForTx(miTx, trace);
// Here, we are trying to get the methodLock on the bean
try
{
boolean acquiredMethodLock = false;
while (!acquiredMethodLock)
{
acquiredMethodLock = attemptMethodLock(mi, trace);
if (!acquiredMethodLock)
{
if (miTx != null)
{
// This thread is involved with a transaction
// We need to check whether the transaction has timed
// out because we may have waited awhile in attemptMethodLock.
if (isTxExpired(miTx))
{
log.error("Saw rolled back tx="+miTx+" waiting for
methodLock."
// +" On method: " + mi.getMethod().getName()
// +" txWaitQueue size: " + txWaitQueue.size()
);
throw new RuntimeException("Transaction marked for rollback,
possibly a timeout");
}
}
else // non-transactional
{
// we're non-transactional so we must return false
// and re-do lock acquisition logic
return false;
}
}
} // end while(acquiredMethodLock)
// We successfully acquired method lock!
}
catch (Exception ex)
{
if (miTx != null)
{
// we have tx mutex so we must
// wakeup next transaction
nextTransaction();
}
throw ex;
}
}
finally
{
if (miTx == null // non-transactional
&& wasThreadScheduled)
{
// if this non-transctional thread was
// scheduled in txWaitQueue, we need to call nextTransaction
// Otherwise, threads in txWaitQueue will never wake up.
nextTransaction();
}
this.releaseSync();
}
//If we reach here we are properly scheduled to go through so return true
return true;
}
/**
* Wait until no other transaction is running with this lock.
*
* @return Returns true if this thread was scheduled in txWaitQueue
*/
protected boolean waitForTx(Transaction miTx, boolean trace) throws Exception
{
boolean wasScheduled = false;
// Do we have a running transaction with the context?
// We loop here until either until success or until transaction timeout
// If we get out of the loop successfully, we can successfully
// set the transaction on this puppy.
while (tx != null &&
// And are we trying to enter with another transaction?
!tx.equals(miTx))
{
wasScheduled = true;
// That's no good, only one transaction per context
// Let's put the thread to sleep the transaction demarcation will wake them
up
if( trace ) log.trace("Transactional contention on context"+id);
TxLock txLock = getTxLock(miTx);
if( trace ) log.trace("Begin wait on Tx="+tx);
// And lock the threads on the lock corresponding to the Tx in MI
synchronized(txLock)
{
releaseSync();
try
{
txLock.wait(txTimeout);
} catch (InterruptedException ignored) {}
} // end synchronized(txLock)
this.sync();
if( trace ) log.trace("End wait on TxLock="+tx);
if (isTxExpired(miTx))
{
log.error(Thread.currentThread() + "Saw rolled back tx="+miTx+" waiting
for txLock"
// +" On method: " + mi.getMethod().getName()
// +" txWaitQueue size: " + txWaitQueue.size()
);
if (txLock.isQueued)
{
// Remove the TxLock from the queue because this thread is exiting.
// Don't worry about notifying other threads that share the same
transaction.
// They will timeout and throw the below RuntimeException
txLocks.remove(txLock);
txWaitQueue.remove(txLock);
}
else if (tx != null && tx.equals(miTx))
{
// We're not qu
nextTransaction();
}
throw new RuntimeException("Transaction marked for rollback, possibly a
timeout");
}
} // end while(tx!=miTx)
return wasScheduled;
}
/**
* Attempt to acquire a method lock.
*/
protected boolean attemptMethodLock(MethodInvocation mi, boolean trace) throws
Exception
{
if (isMethodLocked())
{
// It is locked but re-entrant calls permitted (reentrant home ones are ok
as well)
if (!isCallAllowed(mi))
{
// This instance is in use and you are not permitted to reenter
// Go to sleep and wait for the lock to be released
// Threads finishing invocation will notify() on the lock
if( trace ) log.trace("Thread contention on methodLock, Begin
lock.wait(), id="+mi.getId());
synchronized(methodLock)
{
releaseSync();
try
{
methodLock.wait(txTimeout);
}
catch (InterruptedException ignored) {}
}
this.sync();
if( trace ) log.trace("End lock.wait(), id="+mi.getId()+",
isLocked="+isMethodLocked());
return false;
}
else
{
//We are in a valid reentrant call so add a method lock
addMethodLock();
}
}
// No one is using that instance
else
{
// We are now using the instance
addMethodLock();
}
// if we got here addMethodLock was called
return true;
}
/*
* nextTransaction()
*
* nextTransaction will
* - set the current tx to null
* - schedule the next transaction by notifying all threads waiting on the
transaction
* - setting the thread with the new transaction so there is no race with
incoming calls
*/
protected void nextTransaction()
{
if (!synched)
{
throw new IllegalStateException("do not call nextTransaction while not
synched!");
}
this.tx = null;
// is there a waiting list?
if (!txWaitQueue.isEmpty())
{
TxLock thelock = (TxLock) txWaitQueue.removeFirst();
txLocks.remove(thelock);
thelock.isQueued = false;
// The new transaction is the next one, important to set it up to avoid
race with
// new incoming calls
this.tx = thelock.tx;
// System.out.println(Thread.currentThread()+" handing off to
"+lock.threadName);
synchronized(thelock)
{
// notify All threads waiting on this transaction.
// They will enter the methodLock wait loop.
thelock.notifyAll();
}
}
else
{
// System.out.println(Thread.currentThread()+" handing off to empty
queue");
}
}
public void endTransaction(Transaction transaction)
{
nextTransaction();
}
public void wontSynchronize(Transaction trasaction)
{
nextTransaction();
}
/**
* releaseMethodLock
*
* if we reach the count of zero it means the instance is free from threads (and
reentrency)
* we wake up the next thread in the currentLock
*/
public void releaseMethodLock()
{
numMethodLocks--;
if (numMethodLocks == 0)
{
synchronized(methodLock) {methodLock.notify();}
}
}
/*
* For debugging purposes
private static int filecount = 0;
private static synchronized void saveStackTrace(String msg)
{
System.out.println("saving stack trace");
try
{
FileOutputStream fp = new FileOutputStream("d:/tmp/stacktraces/" +
Integer.toString(filecount++) + ".txt");
PrintStream ps = new PrintStream(fp);
ps.println(msg);
new Throwable().printStackTrace(ps);
ps.close();
fp.close();
}
catch (Exception ignored) {}
System.out.println("done saving stack trace");
}
*/
public void removeRef()
{
refs--;
if (refs == 0 && txWaitQueue.size() > 0)
{
// saveStackTrace("****removing bean lock and it has tx's in
QUEUE!***");
throw new IllegalStateException("removing bean lock and it has tx's in
QUEUE!");
}
else if (refs == 0 && tx != null)
{
// saveStackTrace("****removing bean lock and it has tx set!***");
throw new IllegalStateException("removing bean lock and it has tx set!");
}
/* else if (refs == 0)
System.out.println(Thread.currentThread() + " removing lock!");
*/
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development